You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/20 08:14:11 UTC
[1/2] incubator-ariatosca git commit: wip 2 [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/extract_execution_creation_from_workflow_runner 5b35934cd -> da233c730 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
deleted file mode 100644
index 011c4cc..0000000
--- a/tests/orchestrator/test_workflow_runner.py
+++ /dev/null
@@ -1,726 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import time
-from threading import Thread, Event
-from datetime import datetime
-
-import mock
-import pytest
-
-from aria.modeling import exceptions as modeling_exceptions
-from aria.modeling import models
-from aria.orchestrator import exceptions
-from aria.orchestrator import events
-from aria.orchestrator.workflow_runner import WorkflowRunner
-from aria.orchestrator.workflows.executor.process import ProcessExecutor
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.executor import thread
-from aria.orchestrator import (
- workflow,
- operation,
-)
-
-from tests import (
- mock as tests_mock,
- storage
-)
-
-from ..fixtures import ( # pylint: disable=unused-import
- plugins_dir,
- plugin_manager,
- fs_model as model,
- resource_storage as resource
-)
-
-custom_events = {
- 'is_resumed': Event(),
- 'is_active': Event(),
- 'execution_cancelled': Event(),
- 'execution_failed': Event(),
-}
-
-
-class TimeoutError(BaseException):
- pass
-
-
-class FailingTask(BaseException):
- pass
-
-
-def test_undeclared_workflow(request):
- # validating a proper error is raised when the workflow is not declared in the service
- with pytest.raises(exceptions.UndeclaredWorkflowError):
- _create_workflow_runner(request, 'undeclared_workflow')
-
-
-def test_missing_workflow_implementation(service, request):
- # validating a proper error is raised when the workflow code path does not exist
- workflow = models.Operation(
- name='test_workflow',
- service=service,
- function='nonexistent.workflow.implementation')
- service.workflows['test_workflow'] = workflow
-
- with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
- _create_workflow_runner(request, 'test_workflow')
-
-
-def test_builtin_workflow_instantiation(request):
- # validates the workflow runner instantiates properly when provided with a builtin workflow
- # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
- workflow_runner = _create_workflow_runner(request, 'install')
- tasks = list(workflow_runner.execution.tasks)
- assert len(tasks) == 18 # expecting 18 tasks for 2 node topology
-
-
-def test_custom_workflow_instantiation(request):
- # validates the workflow runner instantiates properly when provided with a custom workflow
- # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
- mock_workflow = _setup_mock_workflow_in_service(request)
- workflow_runner = _create_workflow_runner(request, mock_workflow)
- tasks = list(workflow_runner.execution.tasks)
- assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task
-
-
-def test_existing_active_executions(request, service, model):
- existing_active_execution = models.Execution(
- service=service,
- status=models.Execution.STARTED,
- workflow_name='uninstall')
- model.execution.put(existing_active_execution)
- with pytest.raises(exceptions.ActiveExecutionsError):
- _create_workflow_runner(request, 'install')
-
-
-def test_existing_executions_but_no_active_ones(request, service, model):
- existing_terminated_execution = models.Execution(
- service=service,
- status=models.Execution.SUCCEEDED,
- workflow_name='uninstall')
- model.execution.put(existing_terminated_execution)
- # no active executions exist, so no error should be raised
- _create_workflow_runner(request, 'install')
-
-
-def test_default_executor(request):
- # validates the ProcessExecutor is used by the workflow runner by default
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
- _create_workflow_runner(request, mock_workflow)
- _, engine_kwargs = mock_engine_cls.call_args
- assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
-
-
-def test_custom_executor(request):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- custom_executor = mock.MagicMock()
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
- _create_workflow_runner(request, mock_workflow, executor=custom_executor)
- _, engine_kwargs = mock_engine_cls.call_args
- assert engine_kwargs.get('executors').values()[0] == custom_executor
-
-
-def test_task_configuration_parameters(request):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- task_max_attempts = 5
- task_retry_interval = 7
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute') as \
- mock_engine_execute:
- _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
- task_retry_interval=task_retry_interval).execute()
- _, engine_kwargs = mock_engine_execute.call_args
- assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
- assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
-
-
-def test_execute(request, service):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- mock_engine = mock.MagicMock()
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute',
- return_value=mock_engine) as mock_engine_execute:
- workflow_runner = _create_workflow_runner(request, mock_workflow)
- workflow_runner.execute()
-
- _, engine_kwargs = mock_engine_execute.call_args
- assert engine_kwargs['ctx'].service.id == service.id
- assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
-
- mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
- resuming=False,
- retry_failed=False)
-
-
-def test_cancel_execution(request):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- mock_engine = mock.MagicMock()
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine', return_value=mock_engine):
- workflow_runner = _create_workflow_runner(request, mock_workflow)
- workflow_runner.cancel()
- mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
-
-
-def test_execution_model_creation(request, service, model):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
- workflow_runner = _create_workflow_runner(request, mock_workflow)
-
- assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
- assert workflow_runner.execution.service.id == service.id
- assert workflow_runner.execution.workflow_name == mock_workflow
- assert workflow_runner.execution.created_at <= datetime.utcnow()
- assert workflow_runner.execution.inputs == dict()
-
-
-def test_execution_inputs_override_workflow_inputs(request):
- wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
- mock_workflow = _setup_mock_workflow_in_service(
- request,
- inputs=dict((name, models.Input.wrap(name, val)) for name, val
- in wf_inputs.iteritems()))
-
- with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
- workflow_runner = _create_workflow_runner(
- request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7})
-
- assert len(workflow_runner.execution.inputs) == 3
- # did not override input1 - expecting the default value from the workflow inputs
- assert workflow_runner.execution.inputs['input1'].value == 'value1'
- # overrode input2
- assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2'
- # overrode input of integer type
- assert workflow_runner.execution.inputs['input3'].value == 7
-
-
-def test_execution_inputs_undeclared_inputs(request):
- mock_workflow = _setup_mock_workflow_in_service(request)
-
- with pytest.raises(modeling_exceptions.UndeclaredInputsException):
- _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'})
-
-
-def test_execution_inputs_missing_required_inputs(request):
- mock_workflow = _setup_mock_workflow_in_service(
- request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
-
- with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
- _create_workflow_runner(request, mock_workflow, inputs={})
-
-
-def test_execution_inputs_wrong_type_inputs(request):
- mock_workflow = _setup_mock_workflow_in_service(
- request, inputs={'input': models.Input.wrap('input', 'value')})
-
- with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
- _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
-
-
-def test_execution_inputs_builtin_workflow_with_inputs(request):
- # built-in workflows don't have inputs
- with pytest.raises(modeling_exceptions.UndeclaredInputsException):
- _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'})
-
-
-def test_workflow_function_parameters(request, tmpdir):
- # validating the workflow function is passed with the
- # merged execution inputs, in dict form
-
- # the workflow function parameters will be written to this file
- output_path = str(tmpdir.join('output'))
- wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
-
- mock_workflow = _setup_mock_workflow_in_service(
- request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
- in wf_inputs.iteritems()))
-
- _create_workflow_runner(request, mock_workflow,
- inputs={'input2': 'overriding-value2', 'input3': 7})
-
- with open(output_path) as f:
- wf_call_kwargs = json.load(f)
- assert len(wf_call_kwargs) == 3
- assert wf_call_kwargs.get('input1') == 'value1'
- assert wf_call_kwargs.get('input2') == 'overriding-value2'
- assert wf_call_kwargs.get('input3') == 7
-
-
-@pytest.fixture
-def service(model):
- # sets up a service in the storage
- service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
- service = model.service.get(service_id)
- return service
-
-
-def _setup_mock_workflow_in_service(request, inputs=None):
- # sets up a mock workflow as part of the service, including uploading
- # the workflow code to the service's dir on the resource storage
- service = request.getfixturevalue('service')
- resource = request.getfixturevalue('resource')
-
- source = tests_mock.workflow.__file__
- resource.service_template.upload(str(service.service_template.id), source)
- mock_workflow_name = 'test_workflow'
- arguments = {}
- if inputs:
- for input in inputs.itervalues():
- arguments[input.name] = input.as_argument()
- workflow = models.Operation(
- name=mock_workflow_name,
- service=service,
- function='workflow.mock_workflow',
- inputs=inputs or {},
- arguments=arguments)
- service.workflows[mock_workflow_name] = workflow
- return mock_workflow_name
-
-
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
- task_max_attempts=None, task_retry_interval=None):
- # helper method for instantiating a workflow runner
- service_id = request.getfixturevalue('service').id
- model = request.getfixturevalue('model')
- resource = request.getfixturevalue('resource')
- plugin_manager = request.getfixturevalue('plugin_manager')
-
- # task configuration parameters can't be set to None, therefore only
- # passing those if they've been set by the test
- task_configuration_kwargs = dict()
- if task_max_attempts is not None:
- task_configuration_kwargs['task_max_attempts'] = task_max_attempts
- if task_retry_interval is not None:
- task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
- return WorkflowRunner(
- workflow_name=workflow_name,
- service_id=service_id,
- inputs=inputs or {},
- executor=executor,
- model_storage=model,
- resource_storage=resource,
- plugin_manager=plugin_manager,
- **task_configuration_kwargs)
-
-
-class TestResumableWorkflows(object):
-
- def _create_initial_workflow_runner(
- self, workflow_context, workflow, executor, inputs=None):
-
- service = workflow_context.service
- service.workflows['custom_workflow'] = tests_mock.models.create_operation(
- 'custom_workflow',
- operation_kwargs={
- 'function': '{0}.{1}'.format(__name__, workflow.__name__),
- 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
- }
- )
- workflow_context.model.service.update(service)
-
- wf_runner = WorkflowRunner(
- service_id=workflow_context.service.id,
- inputs=inputs or {},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- workflow_name='custom_workflow',
- executor=executor)
- return wf_runner
-
- @staticmethod
- def _wait_for_active_and_cancel(workflow_runner):
- if custom_events['is_active'].wait(60) is False:
- raise TimeoutError("is_active wasn't set to True")
- workflow_runner.cancel()
- if custom_events['execution_cancelled'].wait(60) is False:
- raise TimeoutError("Execution did not end")
-
- def test_resume_workflow(self, workflow_context, thread_executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_pass_first_task_only)
-
- wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_tasks_workflow, thread_executor,
- inputs={'number_of_tasks': 2})
-
- wf_thread = Thread(target=wf_runner.execute)
- wf_thread.daemon = True
- wf_thread.start()
-
- # Wait for the execution to start
- self._wait_for_active_and_cancel(wf_runner)
- node = workflow_context.model.node.refresh(node)
-
- tasks = workflow_context.model.task.list(filters={'_stub_type': None})
- assert any(task.status == task.SUCCESS for task in tasks)
- assert any(task.status == task.RETRYING for task in tasks)
- custom_events['is_resumed'].set()
- assert any(task.status == task.RETRYING for task in tasks)
-
- # Create a new workflow runner, with an existing execution id. This would cause
- # the old execution to restart.
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=thread_executor)
-
- new_wf_runner.execute()
-
- # Wait for it to finish and assert changes.
- node = workflow_context.model.node.refresh(node)
- assert all(task.status == task.SUCCESS for task in tasks)
- assert node.attributes['invocations'].value == 3
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
- def test_resume_started_task(self, workflow_context, thread_executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_stuck_task)
-
- wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_tasks_workflow, thread_executor,
- inputs={'number_of_tasks': 1})
-
- wf_thread = Thread(target=wf_runner.execute)
- wf_thread.daemon = True
- wf_thread.start()
-
- self._wait_for_active_and_cancel(wf_runner)
- node = workflow_context.model.node.refresh(node)
- task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
- assert node.attributes['invocations'].value == 1
- assert task.status == task.STARTED
- assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
- wf_runner.execution.CANCELLING)
- custom_events['is_resumed'].set()
-
- new_thread_executor = thread.ThreadExecutor()
- try:
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=new_thread_executor)
-
- new_wf_runner.execute()
- finally:
- new_thread_executor.close()
-
- # Wait for it to finish and assert changes.
- node = workflow_context.model.node.refresh(node)
- assert node.attributes['invocations'].value == 2
- assert task.status == task.SUCCESS
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
- def test_resume_failed_task(self, workflow_context, thread_executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_failed_before_resuming)
-
- wf_runner = self._create_initial_workflow_runner(workflow_context,
- mock_parallel_tasks_workflow,
- thread_executor)
- wf_thread = Thread(target=wf_runner.execute)
- wf_thread.setDaemon(True)
- wf_thread.start()
-
- self._wait_for_active_and_cancel(wf_runner)
- node = workflow_context.model.node.refresh(node)
-
- task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
- assert node.attributes['invocations'].value == 2
- assert task.status == task.STARTED
- assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
- wf_runner.execution.CANCELLING)
-
- custom_events['is_resumed'].set()
- assert node.attributes['invocations'].value == 2
-
- # Create a new workflow runner, with an existing execution id. This would cause
- # the old execution to restart.
- new_thread_executor = thread.ThreadExecutor()
- try:
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=new_thread_executor)
-
- new_wf_runner.execute()
- finally:
- new_thread_executor.close()
-
- # Wait for it to finish and assert changes.
- node = workflow_context.model.node.refresh(node)
- assert node.attributes['invocations'].value == task.max_attempts - 1
- assert task.status == task.SUCCESS
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
- def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_pass_first_task_only)
-
- wf_runner = self._create_initial_workflow_runner(
- workflow_context,
- mock_parallel_tasks_workflow,
- thread_executor,
- inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
- )
- wf_thread = Thread(target=wf_runner.execute)
- wf_thread.setDaemon(True)
- wf_thread.start()
-
- if custom_events['execution_failed'].wait(60) is False:
- raise TimeoutError("Execution did not end")
-
- tasks = workflow_context.model.task.list(filters={'_stub_type': None})
- node = workflow_context.model.node.refresh(node)
- assert node.attributes['invocations'].value == 3
- failed_task = [t for t in tasks if t.status == t.FAILED][0]
-
- # First task passes
- assert any(task.status == task.FAILED for task in tasks)
- assert failed_task.attempts_count == 2
- # Second task fails
- assert any(task.status == task.SUCCESS for task in tasks)
- assert wf_runner.execution.status in wf_runner.execution.FAILED
-
- custom_events['is_resumed'].set()
- new_thread_executor = thread.ThreadExecutor()
- try:
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- retry_failed_tasks=True,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=new_thread_executor)
-
- new_wf_runner.execute()
- finally:
- new_thread_executor.close()
-
- # Wait for it to finish and assert changes.
- node = workflow_context.model.node.refresh(node)
- assert failed_task.attempts_count == 1
- assert node.attributes['invocations'].value == 4
- assert all(task.status == task.SUCCESS for task in tasks)
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
- def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
- node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_fail_first_task_only)
-
- wf_runner = self._create_initial_workflow_runner(
- workflow_context,
- mock_sequential_tasks_workflow,
- thread_executor,
- inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
- )
- wf_thread = Thread(target=wf_runner.execute)
- wf_thread.setDaemon(True)
- wf_thread.start()
-
- if custom_events['execution_failed'].wait(60) is False:
- raise TimeoutError("Execution did not end")
-
- tasks = workflow_context.model.task.list(filters={'_stub_type': None})
- node = workflow_context.model.node.refresh(node)
- assert node.attributes['invocations'].value == 1
- assert any(t.status == t.FAILED for t in tasks)
- assert any(t.status == t.PENDING for t in tasks)
-
- custom_events['is_resumed'].set()
- new_thread_executor = thread.ThreadExecutor()
- try:
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=new_thread_executor)
-
- new_wf_runner.execute()
- finally:
- new_thread_executor.close()
-
- # Wait for it to finish and assert changes.
- node = workflow_context.model.node.refresh(node)
- assert node.attributes['invocations'].value == 2
- assert any(t.status == t.SUCCESS for t in tasks)
- assert any(t.status == t.FAILED for t in tasks)
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-
-
- @staticmethod
- @pytest.fixture
- def thread_executor():
- result = thread.ThreadExecutor()
- try:
- yield result
- finally:
- result.close()
-
- @staticmethod
- @pytest.fixture
- def workflow_context(tmpdir):
- workflow_context = tests_mock.context.simple(str(tmpdir))
- yield workflow_context
- storage.release_sqlite_storage(workflow_context.model)
-
- @staticmethod
- def _create_interface(ctx, node, func, arguments=None):
- interface_name = 'aria.interfaces.lifecycle'
- operation_kwargs = dict(function='{name}.{func.__name__}'.format(
- name=__name__, func=func))
- if arguments:
- # the operation has to declare the arguments before those may be passed
- operation_kwargs['arguments'] = arguments
- operation_name = 'create'
- interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
- operation_kwargs=operation_kwargs)
- node.interfaces[interface.name] = interface
- ctx.model.node.update(node)
-
- return node, interface_name, operation_name
-
- @staticmethod
- def _engine(workflow_func, workflow_context, executor):
- graph = workflow_func(ctx=workflow_context)
- execution = workflow_context.execution
- graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
- workflow_context.execution = execution
-
- return engine.Engine(executors={executor.__class__: executor})
-
- @pytest.fixture(autouse=True)
- def register_to_events(self):
- def execution_cancelled(*args, **kwargs):
- custom_events['execution_cancelled'].set()
-
- def execution_failed(*args, **kwargs):
- custom_events['execution_failed'].set()
-
- events.on_cancelled_workflow_signal.connect(execution_cancelled)
- events.on_failure_workflow_signal.connect(execution_failed)
- yield
- events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
- events.on_failure_workflow_signal.disconnect(execution_failed)
- for event in custom_events.values():
- event.clear()
-
-
-@workflow
-def mock_sequential_tasks_workflow(ctx, graph,
- retry_interval=1, max_attempts=10, number_of_tasks=1):
- node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
-
-
-@workflow
-def mock_parallel_tasks_workflow(ctx, graph,
- retry_interval=1, max_attempts=10, number_of_tasks=1):
- node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
-
-
-def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
- return [
- api.task.OperationTask(node,
- 'aria.interfaces.lifecycle',
- 'create',
- retry_interval=retry_interval,
- max_attempts=max_attempts)
- for _ in xrange(number_of_tasks)
- ]
-
-
-
-@operation
-def mock_failed_before_resuming(ctx):
- """
- The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
- overall, the number of invocations should be ctx.task.max_attempts - 1
- """
- ctx.node.attributes['invocations'] += 1
-
- if ctx.node.attributes['invocations'] == 2:
- custom_events['is_active'].set()
- # unfreeze the thread only when all of the invocations are done
- while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
- time.sleep(5)
-
- elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
- # pass only just before the end.
- return
- else:
- # fail o.w.
- raise FailingTask("stop this task")
-
-
-@operation
-def mock_stuck_task(ctx):
- ctx.node.attributes['invocations'] += 1
- while not custom_events['is_resumed'].isSet():
- if not custom_events['is_active'].isSet():
- custom_events['is_active'].set()
- time.sleep(5)
-
-
-@operation
-def mock_pass_first_task_only(ctx):
- ctx.node.attributes['invocations'] += 1
-
- if ctx.node.attributes['invocations'] != 1:
- custom_events['is_active'].set()
- if not custom_events['is_resumed'].isSet():
- # if resume was called, increase by one. o/w fail the execution - second task should
- # fail as long it was not a part of resuming the workflow
- raise FailingTask("wasn't resumed yet")
-
-
-@operation
-def mock_fail_first_task_only(ctx):
- ctx.node.attributes['invocations'] += 1
-
- if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
- raise FailingTask("First task should fail")
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 0c704f5..b63416c 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -50,7 +50,7 @@ class BaseTest(object):
@staticmethod
def _engine(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+ graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
return engine.Engine(executors={executor.__class__: executor})
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index d804de5..4c1e189 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -128,7 +128,7 @@ def run_operation_on_node(ctx, op_name, interface_name, executor):
operation_name=op_name,
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
node.interfaces[interface.name] = interface
- graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile(
+ graph_compiler.GraphCompiler(ctx.execution, ThreadExecutor).compile(
single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 9f072f6..3770c13 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -80,7 +80,7 @@ def test_task_graph_into_execution_graph(tmpdir):
test_task_graph.add_dependency(inner_task_graph, simple_before_task)
test_task_graph.add_dependency(simple_after_task, inner_task_graph)
- compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor)
+ compiler = graph_compiler.GraphCompiler(workflow_context.execution, base.StubTaskExecutor)
compiler.compile(test_task_graph)
execution_tasks = tuple(topological_sort(_graph(workflow_context.execution.tasks)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index b26fa43..0e10073 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -57,7 +57,7 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
out = get_node(context).attributes.get('out').value
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 47ee2f7..5458358 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
[2/2] incubator-ariatosca git commit: wip 2
Posted by mx...@apache.org.
wip 2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/da233c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/da233c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/da233c73
Branch: refs/heads/extract_execution_creation_from_workflow_runner
Commit: da233c730aee305806aaa83177b6b0dbd852c264
Parents: 9611f61
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Nov 19 15:56:32 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Nov 20 10:14:05 2017 +0200
----------------------------------------------------------------------
aria/cli/commands/executions.py | 44 +-
aria/orchestrator/execution/__init__.py | 17 +
aria/orchestrator/execution/compiler.py | 149 ++++
aria/orchestrator/execution/runner.py | 50 ++
aria/orchestrator/workflow_runner.py | 210 ------
.../workflows/core/graph_compiler.py | 12 +-
test_ssh.py | 528 --------------
tests/orchestrator/context/__init__.py | 2 +-
tests/orchestrator/context/test_serialize.py | 2 +-
tests/orchestrator/execution/__init__.py | 14 +
.../execution/test_execution_compiler.py | 698 ++++++++++++++++++
.../orchestrator/execution_plugin/test_local.py | 3 +-
tests/orchestrator/execution_plugin/test_ssh.py | 2 +-
tests/orchestrator/test_workflow_runner.py | 726 -------------------
.../orchestrator/workflows/core/test_engine.py | 2 +-
.../orchestrator/workflows/core/test_events.py | 2 +-
.../test_task_graph_into_execution_graph.py | 2 +-
.../executor/test_process_executor_extension.py | 2 +-
.../test_process_executor_tracked_changes.py | 2 +-
19 files changed, 962 insertions(+), 1505 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 2ab3a33..162abfa 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -26,7 +26,7 @@ from .. import logger as cli_logger
from .. import execution_logging
from ..core import aria
from ...modeling.models import Execution
-from ...orchestrator import workflow_runner
+from ...orchestrator import execution
from ...orchestrator.workflows.executor.dry import DryExecutor
from ...utils import formatting
from ...utils import threading
@@ -143,20 +143,19 @@ def start(workflow_name,
service = model_storage.service.get_by_name(service_name)
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
- execution = workflow_runner.create_execution_model(
- service, workflow_name, inputs
- )
- model_storage.execution.put(execution)
-
- wf_runner = \
- workflow_runner.WorkflowRunner(
- model_storage, resource_storage, plugin_manager,
- execution=execution, executor=executor,
- task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
- )
+ new_execution = execution.ExecutionCompiler(
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ service,
+ workflow_name
+ ).compile(inputs, executor=executor)
+ model_storage.execution.put(new_execution)
+
+ execution_runner = execution.ExecutionRunner(executor=executor)
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
- _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
+ _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
@executions.command(name='resume',
@@ -185,23 +184,18 @@ def resume(execution_id,
"""
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
- execution = model_storage.execution.get(execution_id)
- if execution.status != execution.CANCELLED:
+ execution_to_resume = model_storage.execution.get(execution_id)
+ if execution_to_resume.status != execution_to_resume.CANCELLED:
logger.info("Can't resume execution {execution.id} - "
"execution is in status {execution.status}. "
- "Can only resume executions in status {valid_status}"
- .format(execution=execution, valid_status=execution.CANCELLED))
+ "Can only resume executions in status {execution_to_resume.CANCELLED}"
+ .format(execution=execution_to_resume))
return
-
- wf_runner = \
- workflow_runner.WorkflowRunner(
- model_storage, resource_storage, plugin_manager,
- execution=execution, executor=executor, resume=True,
- retry_failed_tasks=retry_failed_tasks,
- )
+
+ execution_runner = execution.ExecutionRunner(executor, True, retry_failed_tasks)
logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
- _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
+ _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/__init__.py b/aria/orchestrator/execution/__init__.py
new file mode 100644
index 0000000..ef17fde
--- /dev/null
+++ b/aria/orchestrator/execution/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .compiler import ExecutionCompiler
+from .runner import ExecutionRunner
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/compiler.py b/aria/orchestrator/execution/compiler.py
new file mode 100644
index 0000000..5db52d4
--- /dev/null
+++ b/aria/orchestrator/execution/compiler.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from .. import exceptions
+from ..context.workflow import WorkflowContext
+from ..workflows import builtin
+from ..workflows.core import graph_compiler
+from ..workflows.executor.process import ProcessExecutor
+from ...modeling import models
+from ...modeling import utils as modeling_utils
+from ...utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+ def __init__(self, model, resource, plugin, service, workflow_name):
+ self._model = model
+ self._resource = resource
+ self._plugin = plugin
+ self._service = service
+ self._workflow_name = workflow_name
+ self._workflow_context = None
+
+ @property
+ def workflow_ctx(self):
+ return self._workflow_context
+
+ def compile(
+ self,
+ execution_inputs=None,
+ executor=None,
+ task_max_attempts=None,
+ task_retry_interval=None):
+
+ execution = self._create_execution_model(execution_inputs)
+ self._model.execution.put(execution)
+ self._set_ctx(execution, task_max_attempts, task_retry_interval)
+ self._create_tasks(execution, executor=executor)
+ self._model.execution.update(execution)
+ return execution
+
+ def _set_ctx(self, execution, task_max_attempts=None, task_retry_interval=None):
+ self._workflow_context = WorkflowContext(
+ name=self.__class__.__name__,
+ model_storage=self._model,
+ resource_storage=self._resource,
+ service_id=execution.service.id,
+ execution_id=execution.id,
+ workflow_name=execution.workflow_name,
+ task_max_attempts=task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS,
+ task_retry_interval=task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+ )
+
+ def _create_tasks(self, execution, executor=None):
+
+ # Set default executor and kwargs
+ executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+ # transforming the execution inputs to dict, to pass them to the workflow function
+ execution_inputs_dict = dict(inp.unwrapped for inp in execution.inputs.itervalues())
+
+ if len(execution.tasks) == 0:
+ workflow_fn = self._get_workflow_fn(execution.workflow_name)
+ self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict)
+ compiler = graph_compiler.GraphCompiler(self.workflow_ctx.execution, executor.__class__)
+ compiler.compile(self._tasks_graph)
+
+ def _create_execution_model(self, inputs=None):
+ self._validate_workflow_exists_for_service()
+ self._validate_no_active_executions()
+
+ execution = models.Execution(
+ created_at=datetime.utcnow(),
+ service_fk=self._service.id,
+ workflow_name=self._workflow_name,
+ inputs={})
+
+ if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+ workflow_inputs = dict() # built-in workflows don't have any inputs
+ else:
+ workflow_inputs = self._service.workflows[self._workflow_name].inputs
+
+ modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+ supplied_inputs=inputs or {})
+ execution.inputs = modeling_utils.merge_parameter_values(
+ inputs, workflow_inputs, model_cls=models.Input)
+
+ return execution
+
+ def _validate_no_active_executions(self):
+ active_executions = [e for e in self._service.executions if
+ e.is_active()]
+ if active_executions:
+ raise exceptions.ActiveExecutionsError(
+ "Can't start execution; Service {0} has an active execution with ID {1}"
+ .format(self._service.name, active_executions[0].id))
+
+ def _validate_workflow_exists_for_service(self):
+ if self._workflow_name not in self._service.workflows and \
+ self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+ raise exceptions.UndeclaredWorkflowError(
+ 'No workflow policy {0} declared in service {1}'
+ .format(self._workflow_name, self._service.name))
+
+ def _get_workflow_fn(self, workflow_name):
+ if workflow_name in builtin.BUILTIN_WORKFLOWS:
+ return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+ workflow_name))
+
+ workflow = self._service.workflows[workflow_name]
+
+ # TODO: Custom workflow support needs improvement, currently this code uses internal
+ # knowledge of the resource storage; Instead, workflows should probably be loaded
+ # in a similar manner to operation plugins. Also consider passing to import_fullname
+ # as paths instead of appending to sys path.
+ service_template_resources_path = os.path.join(
+ self._resource.service_template.base_path,
+ str(self._service.service_template.id))
+ sys.path.append(service_template_resources_path)
+
+ try:
+ workflow_fn = import_fullname(workflow.function)
+ except ImportError:
+ raise exceptions.WorkflowImplementationNotFoundError(
+ 'Could not find workflow {0} function at {1}'.format(
+ workflow_name, workflow.function))
+
+ return workflow_fn
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/runner.py b/aria/orchestrator/execution/runner.py
new file mode 100644
index 0000000..a532901
--- /dev/null
+++ b/aria/orchestrator/execution/runner.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Running workflows.
+"""
+
+from ..workflows.core import engine
+
+
+class ExecutionRunner(object):
+
+ def __init__(self, executor, resume=False, retry_failed_tasks=False):
+ """
+ Manages a single workflow execution on a given service.
+
+ :param workflow_name: workflow name
+ :param service_id: service ID
+ :param inputs: key-value dict of inputs for the execution
+ :param model_storage: model storage API ("MAPI")
+ :param resource_storage: resource storage API ("RAPI")
+ :param plugin_manager: plugin manager
+ :param executor: executor for tasks; defaults to a
+ :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
+ :param task_max_attempts: maximum attempts of repeating each failing task
+ :param task_retry_interval: retry interval between retry attempts of a failing task
+ """
+
+ self._is_resume = resume
+ self._retry_failed_tasks = retry_failed_tasks
+ self._engine = engine.Engine(executors={executor.__class__: executor})
+
+ def execute(self, ctx):
+ self._engine.execute(
+ ctx=ctx, resuming=self._is_resume, retry_failed=self._retry_failed_tasks)
+
+ def cancel(self, ctx):
+ self._engine.cancel_execution(ctx)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
deleted file mode 100644
index aebde8e..0000000
--- a/aria/orchestrator/workflow_runner.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Running workflows.
-"""
-
-import os
-import sys
-from datetime import datetime
-
-from . import exceptions
-from .context.workflow import WorkflowContext
-from .workflows import builtin
-from .workflows.core import engine, graph_compiler
-from .workflows.executor.process import ProcessExecutor
-from ..modeling import models
-from ..modeling import utils as modeling_utils
-from ..utils.imports import import_fullname
-
-
-DEFAULT_TASK_MAX_ATTEMPTS = 30
-DEFAULT_TASK_RETRY_INTERVAL = 30
-
-
-class WorkflowRunner(object):
-
- def __init__(
- self,
- model_storage,
- resource_storage,
- plugin_manager,
- execution,
- executor=None,
- resume=False,
- retry_failed_tasks=False,
- task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
- task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL
- ):
- """
- Manages a single workflow execution on a given service.
-
- :param workflow_name: workflow name
- :param service_id: service ID
- :param inputs: key-value dict of inputs for the execution
- :param model_storage: model storage API ("MAPI")
- :param resource_storage: resource storage API ("RAPI")
- :param plugin_manager: plugin manager
- :param executor: executor for tasks; defaults to a
- :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
- :param task_max_attempts: maximum attempts of repeating each failing task
- :param task_retry_interval: retry interval between retry attempts of a failing task
- """
-
- self._is_resume = resume
- self._retry_failed_tasks = retry_failed_tasks
-
- self._model_storage = model_storage
- self._resource_storage = resource_storage
- self._execution = execution
-
- # the IDs are stored rather than the models themselves, so this module could be used
- # by several threads without raising errors on model objects shared between threads
-
- self._workflow_context = WorkflowContext(
- name=self.__class__.__name__,
- model_storage=self._model_storage,
- resource_storage=resource_storage,
- service_id=self.execution.service.id,
- execution_id=self.execution.id,
- workflow_name=self.execution.workflow_name,
- task_max_attempts=task_max_attempts,
- task_retry_interval=task_retry_interval)
-
- # Set default executor and kwargs
- executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
-
- # transforming the execution inputs to dict, to pass them to the workflow function
- execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
-
- if not self._is_resume:
- workflow_fn = self._get_workflow_fn()
- self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
- compiler.compile(self._tasks_graph)
-
- self._engine = engine.Engine(executors={executor.__class__: executor})
-
- @property
- def execution_id(self):
- return self.execution.id
-
- @property
- def execution(self):
- return self._model_storage.execution.get(self.execution_id)
-
- @property
- def service(self):
- return self._model_storage.service.get(self._execution.service.id)
-
- def execute(self):
- self._engine.execute(ctx=self._workflow_context,
- resuming=self._is_resume,
- retry_failed=self._retry_failed_tasks)
-
- def cancel(self):
- self._engine.cancel_execution(ctx=self._workflow_context)
-
- def _get_workflow_fn(self):
- if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
- return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
- self.execution.workflow_name))
-
- workflow = self.service.workflows[self.execution.workflow_name]
-
- # TODO: Custom workflow support needs improvement, currently this code uses internal
- # knowledge of the resource storage; Instead, workflows should probably be loaded
- # in a similar manner to operation plugins. Also consider passing to import_fullname
- # as paths instead of appending to sys path.
- service_template_resources_path = os.path.join(
- self._resource_storage.service_template.base_path,
- str(self.service.service_template.id))
- sys.path.append(service_template_resources_path)
-
- try:
- workflow_fn = import_fullname(workflow.function)
- except ImportError:
- raise exceptions.WorkflowImplementationNotFoundError(
- 'Could not find workflow {0} function at {1}'.format(
- self._workflow_name, workflow.function))
-
- return workflow_fn
-
-
-def create_execution_model(service, workflow_name, inputs):
- _validate_workflow_exists_for_service(service, workflow_name)
- _validate_no_active_executions(service)
- execution = models.Execution(
- created_at=datetime.utcnow(),
- service_fk=service.id,
- workflow_name=workflow_name,
- inputs={})
-
- if workflow_name in builtin.BUILTIN_WORKFLOWS:
- workflow_inputs = dict() # built-in workflows don't have any inputs
- else:
- workflow_inputs = service.workflows[workflow_name].inputs
-
- modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- execution.inputs = modeling_utils.merge_parameter_values(
- inputs, workflow_inputs, model_cls=models.Input)
-
- return execution
-
-
-
-def _create_execution_model(self, inputs):
- execution = models.Execution(
- created_at=datetime.utcnow(),
- service=self.service,
- workflow_name=self._workflow_name,
- inputs={})
-
- if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
- workflow_inputs = dict() # built-in workflows don't have any inputs
- else:
- workflow_inputs = self.service.workflows[self._workflow_name].inputs
-
- modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
- supplied_inputs=inputs or {})
- execution.inputs = modeling_utils.merge_parameter_values(
- inputs, workflow_inputs, model_cls=models.Input)
- # TODO: these two following calls should execute atomically
- self._validate_no_active_executions(execution)
- self._model_storage.execution.put(execution)
- return execution
-
-
-
-def _validate_no_active_executions(service):
- active_executions = [e for e in service.executions if e.is_active()]
- if active_executions:
- raise exceptions.ActiveExecutionsError(
- "Can't start execution; Service {0} has an active execution with ID {1}"
- .format(service.name, active_executions[0].id))
-
-
-def _validate_workflow_exists_for_service(service, workflow_name):
- if workflow_name not in service.workflows and \
- workflow_name not in builtin.BUILTIN_WORKFLOWS:
- raise exceptions.UndeclaredWorkflowError(
- 'No workflow policy {0} declared in service {1}'
- .format(workflow_name, service.name))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflows/core/graph_compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py
index 81543d5..aeb05bb 100644
--- a/aria/orchestrator/workflows/core/graph_compiler.py
+++ b/aria/orchestrator/workflows/core/graph_compiler.py
@@ -19,8 +19,8 @@ from .. import executor, api
class GraphCompiler(object):
- def __init__(self, ctx, default_executor):
- self._ctx = ctx
+ def __init__(self, execution, default_executor):
+ self._execution = execution
self._default_executor = default_executor
self._stub_executor = executor.base.StubTaskExecutor
self._model_to_api_id = {}
@@ -65,7 +65,7 @@ class GraphCompiler(object):
# Insert end marker
self._create_stub_task(
end_stub_type,
- self._get_non_dependent_tasks(self._ctx.execution) or [start_task],
+ self._get_non_dependent_tasks(self._execution) or [start_task],
self._end_graph_suffix(task_graph.id),
task_graph.name
)
@@ -74,17 +74,15 @@ class GraphCompiler(object):
model_task = models.Task(
name=name,
dependencies=dependencies,
- execution=self._ctx.execution,
+ execution=self._execution,
_executor=self._stub_executor,
_stub_type=stub_type)
- self._ctx.model.task.put(model_task)
self._model_to_api_id[model_task.id] = api_id
return model_task
def _create_operation_task(self, api_task, dependencies):
model_task = models.Task.from_api_task(
api_task, self._default_executor, dependencies=dependencies)
- self._ctx.model.task.put(model_task)
self._model_to_api_id[model_task.id] = api_task.id
return model_task
@@ -113,6 +111,6 @@ class GraphCompiler(object):
dependency_name = dependency.id
else:
dependency_name = self._end_graph_suffix(dependency.id)
- tasks.extend(task for task in self._ctx.execution.tasks
+ tasks.extend(task for task in self._execution.tasks
if self._model_to_api_id.get(task.id, None) == dependency_name)
return tasks
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/test_ssh.py
----------------------------------------------------------------------
diff --git a/test_ssh.py b/test_ssh.py
deleted file mode 100644
index 5256cf8..0000000
--- a/test_ssh.py
+++ /dev/null
@@ -1,528 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import contextlib
-import json
-import logging
-import os
-
-import pytest
-
-import fabric.api
-from fabric.contrib import files
-from fabric import context_managers
-
-from aria.modeling import models
-from aria.orchestrator import events
-from aria.orchestrator import workflow
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.exceptions import ExecutorException
-from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
-from aria.orchestrator.execution_plugin import operations
-from aria.orchestrator.execution_plugin import constants
-from aria.orchestrator.execution_plugin.exceptions import ProcessException, TaskException
-from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations
-
-from tests import mock, storage, resources
-from tests.orchestrator.workflows.helpers import events_collector
-
-_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx'
-
-import tests
-KEY_FILENAME = os.path.join(tests.ROOT_DIR, 'tests/resources/keys/test')
-
-_FABRIC_ENV = {
- 'disable_known_hosts': True,
- 'user': 'test',
- 'key_filename': KEY_FILENAME
-}
-
-
-import mockssh
-@pytest.fixture(scope='session')
-def server():
- with mockssh.Server({'test': KEY_FILENAME}) as s:
- yield s
-
-
-#@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server required')
-class TestWithActualSSHServer(object):
-
- def test_run_script_basic(self):
- expected_attribute_value = 'some_value'
- props = self._execute(env={'test_value': expected_attribute_value})
- assert props['test_value'].value == expected_attribute_value
-
- @pytest.mark.skip(reason='sudo privileges are required')
- def test_run_script_as_sudo(self):
- self._execute(use_sudo=True)
- with self._ssh_env():
- assert files.exists('/opt/test_dir')
- fabric.api.sudo('rm -rf /opt/test_dir')
-
- def test_run_script_default_base_dir(self):
- props = self._execute()
- assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
-
- @pytest.mark.skip(reason='Re-enable once output from process executor can be captured')
- @pytest.mark.parametrize('hide_groups', [[], ['everything']])
- def test_run_script_with_hide(self, hide_groups):
- self._execute(hide_output=hide_groups)
- output = 'TODO'
- expected_log_message = ('[localhost] run: source {0}/scripts/'
- .format(constants.DEFAULT_BASE_DIR))
- if hide_groups:
- assert expected_log_message not in output
- else:
- assert expected_log_message in output
-
- def test_run_script_process_config(self):
- expected_env_value = 'test_value_env'
- expected_arg1_value = 'test_value_arg1'
- expected_arg2_value = 'test_value_arg2'
- expected_cwd = '/tmp'
- expected_base_dir = _CUSTOM_BASE_DIR
- props = self._execute(
- env={'test_value_env': expected_env_value},
- process={
- 'args': [expected_arg1_value, expected_arg2_value],
- 'cwd': expected_cwd,
- 'base_dir': expected_base_dir
- })
- assert props['env_value'].value == expected_env_value
- assert len(props['bash_version'].value) > 0
- assert props['arg1_value'].value == expected_arg1_value
- assert props['arg2_value'].value == expected_arg2_value
- assert props['cwd'].value == expected_cwd
- assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
-
- def test_run_script_command_prefix(self):
- props = self._execute(process={'command_prefix': 'bash -i'})
- assert 'i' in props['dollar_dash'].value
-
- def test_run_script_reuse_existing_ctx(self):
- expected_test_value_1 = 'test_value_1'
- expected_test_value_2 = 'test_value_2'
- props = self._execute(
- test_operations=['{0}_1'.format(self.test_name),
- '{0}_2'.format(self.test_name)],
- env={'test_value1': expected_test_value_1,
- 'test_value2': expected_test_value_2})
- assert props['test_value1'].value == expected_test_value_1
- assert props['test_value2'].value == expected_test_value_2
-
- def test_run_script_download_resource_plain(self, tmpdir):
- resource = tmpdir.join('resource')
- resource.write('content')
- self._upload(str(resource), 'test_resource')
- props = self._execute()
- assert props['test_value'].value == 'content'
-
- def test_run_script_download_resource_and_render(self, tmpdir):
- resource = tmpdir.join('resource')
- resource.write('{{ctx.service.name}}')
- self._upload(str(resource), 'test_resource')
- props = self._execute()
- assert props['test_value'].value == self._workflow_context.service.name
-
- @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
- def test_run_script_inputs_as_env_variables_no_override(self, value):
- props = self._execute(custom_input=value)
- return_value = props['test_value'].value
- expected = return_value if isinstance(value, basestring) else json.loads(return_value)
- assert value == expected
-
- @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
- def test_run_script_inputs_as_env_variables_process_env_override(self, value):
- props = self._execute(custom_input='custom-input-value',
- env={'custom_env_var': value})
- return_value = props['test_value'].value
- expected = return_value if isinstance(value, basestring) else json.loads(return_value)
- assert value == expected
-
- def test_run_script_error_in_script(self):
- exception = self._execute_and_get_task_exception()
- assert isinstance(exception, TaskException)
-
- def test_run_script_abort_immediate(self):
- exception = self._execute_and_get_task_exception()
- assert isinstance(exception, TaskAbortException)
- assert exception.message == 'abort-message'
-
- def test_run_script_retry(self):
- exception = self._execute_and_get_task_exception()
- assert isinstance(exception, TaskRetryException)
- assert exception.message == 'retry-message'
-
- def test_run_script_abort_error_ignored_by_script(self):
- exception = self._execute_and_get_task_exception()
- assert isinstance(exception, TaskAbortException)
- assert exception.message == 'abort-message'
-
- def test_run_commands(self):
- temp_file_path = '/tmp/very_temporary_file'
- with self._ssh_env():
- if files.exists(temp_file_path):
- fabric.api.run('rm {0}'.format(temp_file_path))
- self._execute(commands=['touch {0}'.format(temp_file_path)])
- with self._ssh_env():
- assert files.exists(temp_file_path)
- fabric.api.run('rm {0}'.format(temp_file_path))
-
- @pytest.fixture(autouse=True)
- def _setup(self, request, workflow_context, executor, capfd, server):
- print 'HI!!!!!!!!!!', server.port
- self._workflow_context = workflow_context
- self._executor = executor
- self._capfd = capfd
- self.test_name = request.node.originalname or request.node.name
- with self._ssh_env(server):
- for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]:
- if files.exists(directory):
- fabric.api.run('rm -rf {0}'.format(directory))
-
- @contextlib.contextmanager
- def _ssh_env(self, server):
- with self._capfd.disabled():
- with context_managers.settings(fabric.api.hide('everything'),
- host_string='localhost:{0}'.format(server.port),
- **_FABRIC_ENV):
- yield
-
- def _execute(self,
- env=None,
- use_sudo=False,
- hide_output=None,
- process=None,
- custom_input='',
- test_operations=None,
- commands=None):
- process = process or {}
- if env:
- process.setdefault('env', {}).update(env)
-
- test_operations = test_operations or [self.test_name]
-
- local_script_path = os.path.join(resources.DIR, 'scripts', 'test_ssh.sh')
- script_path = os.path.basename(local_script_path)
- self._upload(local_script_path, script_path)
-
- if commands:
- operation = operations.run_commands_with_ssh
- else:
- operation = operations.run_script_with_ssh
-
- node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- arguments = {
- 'script_path': script_path,
- 'fabric_env': _FABRIC_ENV,
- 'process': process,
- 'use_sudo': use_sudo,
- 'custom_env_var': custom_input,
- 'test_operation': '',
- }
- if hide_output:
- arguments['hide_output'] = hide_output
- if commands:
- arguments['commands'] = commands
- interface = mock.models.create_interface(
- node.service,
- 'test',
- 'op',
- operation_kwargs=dict(
- function='{0}.{1}'.format(
- operations.__name__,
- operation.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
-
- @workflow
- def mock_workflow(ctx, graph):
- ops = []
- for test_operation in test_operations:
- op_arguments = arguments.copy()
- op_arguments['test_operation'] = test_operation
- ops.append(api.task.OperationTask(
- node,
- interface_name='test',
- operation_name='op',
- arguments=op_arguments))
-
- graph.sequence(*ops)
- return graph
- tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(
- self._workflow_context, self._executor.__class__).compile(tasks_graph)
- eng = engine.Engine({self._executor.__class__: self._executor})
- eng.execute(self._workflow_context)
- return self._workflow_context.model.node.get_by_name(
- mock.models.DEPENDENCY_NODE_NAME).attributes
-
- def _execute_and_get_task_exception(self, *args, **kwargs):
- signal = events.on_failure_task_signal
- with events_collector(signal) as collected:
- with pytest.raises(ExecutorException):
- self._execute(*args, **kwargs)
- return collected[signal][0]['kwargs']['exception']
-
- def _upload(self, source, path):
- self._workflow_context.resource.service.upload(
- entry_id=str(self._workflow_context.service.id),
- source=source,
- path=path)
-
- @pytest.fixture
- def executor(self):
- result = process.ProcessExecutor()
- try:
- yield result
- finally:
- result.close()
-
- @pytest.fixture
- def workflow_context(self, tmpdir):
- workflow_context = mock.context.simple(str(tmpdir))
- workflow_context.states = []
- workflow_context.exception = None
- yield workflow_context
- storage.release_sqlite_storage(workflow_context.model)
-
-
-class TestFabricEnvHideGroupsAndRunCommands(object):
-
- def test_fabric_env_default_override(self):
- # first sanity for no override
- self._run()
- assert self.mock.settings_merged['timeout'] == constants.FABRIC_ENV_DEFAULTS['timeout']
- # now override
- invocation_fabric_env = self.default_fabric_env.copy()
- timeout = 1000000
- invocation_fabric_env['timeout'] = timeout
- self._run(fabric_env=invocation_fabric_env)
- assert self.mock.settings_merged['timeout'] == timeout
-
- def test_implicit_host_string(self, mocker):
- expected_host_address = '1.1.1.1'
- mocker.patch.object(self._Ctx.task.actor, 'host')
- mocker.patch.object(self._Ctx.task.actor.host, 'host_address', expected_host_address)
- fabric_env = self.default_fabric_env.copy()
- del fabric_env['host_string']
- self._run(fabric_env=fabric_env)
- assert self.mock.settings_merged['host_string'] == expected_host_address
-
- def test_explicit_host_string(self):
- fabric_env = self.default_fabric_env.copy()
- host_string = 'explicit_host_string'
- fabric_env['host_string'] = host_string
- self._run(fabric_env=fabric_env)
- assert self.mock.settings_merged['host_string'] == host_string
-
- def test_override_warn_only(self):
- fabric_env = self.default_fabric_env.copy()
- self._run(fabric_env=fabric_env)
- assert self.mock.settings_merged['warn_only'] is True
- fabric_env = self.default_fabric_env.copy()
- fabric_env['warn_only'] = False
- self._run(fabric_env=fabric_env)
- assert self.mock.settings_merged['warn_only'] is False
-
- def test_missing_host_string(self):
- with pytest.raises(TaskAbortException) as exc_ctx:
- fabric_env = self.default_fabric_env.copy()
- del fabric_env['host_string']
- self._run(fabric_env=fabric_env)
- assert '`host_string` not supplied' in str(exc_ctx.value)
-
- def test_missing_user(self):
- with pytest.raises(TaskAbortException) as exc_ctx:
- fabric_env = self.default_fabric_env.copy()
- del fabric_env['user']
- self._run(fabric_env=fabric_env)
- assert '`user` not supplied' in str(exc_ctx.value)
-
- def test_missing_key_or_password(self):
- with pytest.raises(TaskAbortException) as exc_ctx:
- fabric_env = self.default_fabric_env.copy()
- del fabric_env['key_filename']
- self._run(fabric_env=fabric_env)
- assert 'Access credentials not supplied' in str(exc_ctx.value)
-
- def test_hide_in_settings_and_non_viable_groups(self):
- groups = ('running', 'stdout')
- self._run(hide_output=groups)
- assert set(self.mock.settings_merged['hide_output']) == set(groups)
- with pytest.raises(TaskAbortException) as exc_ctx:
- self._run(hide_output=('running', 'bla'))
- assert '`hide_output` must be a subset of' in str(exc_ctx.value)
-
- def test_run_commands(self):
- def test(use_sudo):
- commands = ['command1', 'command2']
- self._run(
- commands=commands,
- use_sudo=use_sudo)
- assert all(item in self.mock.settings_merged.items() for
- item in self.default_fabric_env.items())
- assert self.mock.settings_merged['warn_only'] is True
- assert self.mock.settings_merged['use_sudo'] == use_sudo
- assert self.mock.commands == commands
- self.mock.settings_merged = {}
- self.mock.commands = []
- test(use_sudo=False)
- test(use_sudo=True)
-
- def test_failed_command(self):
- with pytest.raises(ProcessException) as exc_ctx:
- self._run(commands=['fail'])
- exception = exc_ctx.value
- assert exception.stdout == self.MockCommandResult.stdout
- assert exception.stderr == self.MockCommandResult.stderr
- assert exception.command == self.MockCommandResult.command
- assert exception.exit_code == self.MockCommandResult.return_code
-
- class MockCommandResult(object):
- stdout = 'mock_stdout'
- stderr = 'mock_stderr'
- command = 'mock_command'
- return_code = 1
-
- def __init__(self, failed):
- self.failed = failed
-
- class MockFabricApi(object):
-
- def __init__(self):
- self.commands = []
- self.settings_merged = {}
-
- @contextlib.contextmanager
- def settings(self, *args, **kwargs):
- self.settings_merged.update(kwargs)
- if args:
- groups = args[0]
- self.settings_merged.update({'hide_output': groups})
- yield
-
- def run(self, command):
- self.commands.append(command)
- self.settings_merged['use_sudo'] = False
- return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
- def sudo(self, command):
- self.commands.append(command)
- self.settings_merged['use_sudo'] = True
- return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
- def hide(self, *groups):
- return groups
-
- def exists(self, *args, **kwargs):
- raise RuntimeError
-
- class _Ctx(object):
- INSTRUMENTATION_FIELDS = ()
-
- class Task(object):
- @staticmethod
- def abort(message=None):
- models.Task.abort(message)
- actor = None
-
- class Actor(object):
- host = None
-
- class Model(object):
- @contextlib.contextmanager
- def instrument(self, *args, **kwargs):
- yield
- task = Task
- task.actor = Actor
- model = Model()
- logger = logging.getLogger()
-
- @staticmethod
- @contextlib.contextmanager
- def _mock_self_logging(*args, **kwargs):
- yield
- _Ctx.logging_handlers = _mock_self_logging
-
- @pytest.fixture(autouse=True)
- def _setup(self, mocker):
- self.default_fabric_env = {
- 'host_string': 'test',
- 'user': 'test',
- 'key_filename': 'test',
- }
- self.mock = self.MockFabricApi()
- mocker.patch('fabric.api', self.mock)
-
- def _run(self,
- commands=(),
- fabric_env=None,
- process=None,
- use_sudo=False,
- hide_output=None):
- operations.run_commands_with_ssh(
- ctx=self._Ctx,
- commands=commands,
- process=process,
- fabric_env=fabric_env or self.default_fabric_env,
- use_sudo=use_sudo,
- hide_output=hide_output)
-
-
-class TestUtilityFunctions(object):
-
- def test_paths(self):
- base_dir = '/path'
- local_script_path = '/local/script/path.py'
- paths = ssh_operations._Paths(base_dir=base_dir,
- local_script_path=local_script_path)
- assert paths.local_script_path == local_script_path
- assert paths.remote_ctx_dir == base_dir
- assert paths.base_script_path == 'path.py'
- assert paths.remote_ctx_path == '/path/ctx'
- assert paths.remote_scripts_dir == '/path/scripts'
- assert paths.remote_work_dir == '/path/work'
- assert paths.remote_env_script_path.startswith('/path/scripts/env-path.py-')
- assert paths.remote_script_path.startswith('/path/scripts/path.py-')
-
- def test_write_environment_script_file(self):
- base_dir = '/path'
- local_script_path = '/local/script/path.py'
- paths = ssh_operations._Paths(base_dir=base_dir,
- local_script_path=local_script_path)
- env = {'one': "'1'"}
- local_socket_url = 'local_socket_url'
- remote_socket_url = 'remote_socket_url'
- env_script_lines = set([l for l in ssh_operations._write_environment_script_file(
- process={'env': env},
- paths=paths,
- local_socket_url=local_socket_url,
- remote_socket_url=remote_socket_url
- ).getvalue().split('\n') if l])
- expected_env_script_lines = set([
- 'export PATH=/path:$PATH',
- 'export PYTHONPATH=/path:$PYTHONPATH',
- 'chmod +x /path/ctx',
- 'chmod +x {0}'.format(paths.remote_script_path),
- 'export CTX_SOCKET_URL={0}'.format(remote_socket_url),
- 'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url),
- 'export one=\'1\''
- ])
- assert env_script_lines == expected_env_script_lines
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 780db07..d0b85d3 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -26,7 +26,7 @@ def op_path(func, module_path=None):
def execute(workflow_func, workflow_context, executor):
graph = workflow_func(ctx=workflow_context)
- graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+ graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
eng = engine.Engine(executors={executor.__class__: executor})
eng.execute(workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 091e23c..8e08e72 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
context.model.node.update(node)
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+ graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/__init__.py b/tests/orchestrator/execution/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/execution/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/test_execution_compiler.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py
new file mode 100644
index 0000000..b044872
--- /dev/null
+++ b/tests/orchestrator/execution/test_execution_compiler.py
@@ -0,0 +1,698 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from threading import Thread, Event
+from datetime import datetime
+
+import mock
+import pytest
+
+from aria.modeling import exceptions as modeling_exceptions
+from aria.modeling import models
+from aria.orchestrator import exceptions
+from aria.orchestrator import events
+from aria.orchestrator import execution as orch_execution
+from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+ workflow,
+ operation,
+)
+
+from tests import (
+ mock as tests_mock,
+ storage
+)
+
+from ...fixtures import ( # pylint: disable=unused-import
+ plugins_dir,
+ plugin_manager,
+ fs_model as model,
+ resource_storage as resource
+)
+
+custom_events = {
+ 'is_resumed': Event(),
+ 'is_active': Event(),
+ 'execution_cancelled': Event(),
+ 'execution_failed': Event(),
+}
+
+
+class TimeoutError(BaseException):
+ pass
+
+
+class FailingTask(BaseException):
+ pass
+
+
+def test_undeclared_workflow(request):
+ # validating a proper error is raised when the workflow is not declared in the service
+ with pytest.raises(exceptions.UndeclaredWorkflowError):
+ _get_compiler(request, 'undeclared_workflow').compile()
+
+
+def test_missing_workflow_implementation(service, request):
+ # validating a proper error is raised when the workflow code path does not exist
+ workflow = models.Operation(
+ name='test_workflow',
+ service=service,
+ function='nonexistent.workflow.implementation')
+ service.workflows['test_workflow'] = workflow
+
+ with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
+ _get_compiler(request, 'test_workflow').compile()
+
+
+def test_builtin_workflow_instantiation(request, model):
+ # validates the workflow runner instantiates properly when provided with a builtin workflow
+ # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+ execution = _get_compiler(request, 'install').compile()
+ assert len(execution.tasks) == 18 # expecting 18 tasks for 2 node topology
+
+
+def test_custom_workflow_instantiation(request):
+ # validates the workflow runner instantiates properly when provided with a custom workflow
+ # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+ mock_workflow = _setup_mock_workflow_in_service(request)
+ execution = _get_compiler(request, mock_workflow).compile()
+ assert len(execution.tasks) == 2 # mock workflow creates only start workflow and end workflow task
+
+
+def test_existing_active_executions(request, service, model):
+ existing_active_execution = models.Execution(
+ service=service,
+ status=models.Execution.STARTED,
+ workflow_name='uninstall')
+ model.execution.put(existing_active_execution)
+ with pytest.raises(exceptions.ActiveExecutionsError):
+ _get_compiler(request, 'install').compile()
+
+
+def test_existing_executions_but_no_active_ones(request, service, model):
+ existing_terminated_execution = models.Execution(
+ service=service,
+ status=models.Execution.SUCCEEDED,
+ workflow_name='uninstall')
+ model.execution.put(existing_terminated_execution)
+ # no active executions exist, so no error should be raised
+ _get_compiler(request, 'install').compile()
+
+
+def test_default_executor(request):
+ # validates the ProcessExecutor is used by the workflow runner by default
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
+ execution = _get_compiler(request, mock_workflow).compile()
+ orch_execution.ExecutionRunner(ProcessExecutor())
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
+
+
+def test_custom_executor(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ custom_executor = mock.MagicMock()
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
+ execution = _get_compiler(request, mock_workflow).compile(executor=custom_executor)
+ orch_execution.ExecutionRunner(custom_executor)
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert engine_kwargs.get('executors').values()[0] == custom_executor
+
+
+def test_task_configuration_parameters(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ task_max_attempts = 5
+ task_retry_interval = 7
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute') as \
+ mock_engine_execute:
+ compiler = _get_compiler(request, mock_workflow)
+ execution = compiler.compile(task_max_attempts=task_max_attempts,
+ task_retry_interval=task_retry_interval)
+ orch_execution.ExecutionRunner(ProcessExecutor()).execute(compiler.workflow_ctx)
+ _, engine_kwargs = mock_engine_execute.call_args
+ assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
+ assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
+
+
+def test_execute(request, service):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ mock_engine = mock.MagicMock()
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute',
+ return_value=mock_engine) as mock_engine_execute:
+ compiler = _get_compiler(request, mock_workflow).compile()
+ compiler.compile()
+
+ runner = orch_execution.runner.ExecutionRunner(ProcessExecutor())
+ runner.execute(compiler.workflow_ctx)
+
+ _, engine_kwargs = mock_engine_execute.call_args
+ assert engine_kwargs['ctx'].service.id == service.id
+ assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
+
+ mock_engine_execute.assert_called_once_with(ctx=compiler.workflow_ctx,
+ resuming=False,
+ retry_failed=False)
+
+
+def test_cancel_execution(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ mock_engine = mock.MagicMock()
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine', return_value=mock_engine):
+ compiler = _get_compiler(request, mock_workflow)
+ execution = compiler.compile()
+
+ runner = orch_execution.ExecutionRunner(ProcessExecutor())
+ runner.cancel(ctx=compiler.workflow_ctx)
+ mock_engine.cancel_execution.assert_called_once_with(compiler.workflow_ctx)
+
+
+def test_execution_model_creation(request, service):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
+ execution = _get_compiler(request, mock_workflow).compile()
+
+ assert execution.service.id == service.id
+ assert execution.workflow_name == mock_workflow
+ assert execution.created_at <= datetime.utcnow()
+ assert execution.inputs == dict()
+
+
+def test_execution_inputs_override_workflow_inputs(request):
+ wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
+ mock_workflow = _setup_mock_workflow_in_service(
+ request,
+ inputs=dict((name, models.Input.wrap(name, val)) for name, val
+ in wf_inputs.iteritems()))
+
+ with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
+ execution = _get_compiler(request, mock_workflow).compile(
+ execution_inputs={'input2': 'overriding-value2', 'input3': 7}
+ )
+
+ assert len(execution.inputs) == 3
+ # did not override input1 - expecting the default value from the workflow inputs
+ assert execution.inputs['input1'].value == 'value1'
+ # overrode input2
+ assert execution.inputs['input2'].value == 'overriding-value2'
+ # overrode input of integer type
+ assert execution.inputs['input3'].value == 7
+
+
+def test_execution_inputs_undeclared_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+ _get_compiler(request, mock_workflow).compile(
+ execution_inputs={'undeclared_input': 'value'})
+
+
+def test_execution_inputs_missing_required_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
+
+ with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
+ _get_compiler(request, mock_workflow).compile(execution_inputs={})
+
+
+def test_execution_inputs_wrong_type_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs={'input': models.Input.wrap('input', 'value')})
+
+ with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
+ _get_compiler(request, mock_workflow).compile(execution_inputs={'input': 5})
+
+
+def test_execution_inputs_builtin_workflow_with_inputs(request):
+ # built-in workflows don't have inputs
+ with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+ _get_compiler(request, 'install').compile(execution_inputs={'undeclared_input': 'value'})
+
+
+def test_workflow_function_parameters(request, tmpdir):
+ # validating the workflow function is passed with the
+ # merged execution inputs, in dict form
+
+ # the workflow function parameters will be written to this file
+ output_path = str(tmpdir.join('output'))
+ wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
+
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
+ in wf_inputs.iteritems()))
+
+ _get_compiler(request, mock_workflow).compile(
+ execution_inputs={'input2': 'overriding-value2', 'input3': 7})
+
+ with open(output_path) as f:
+ wf_call_kwargs = json.load(f)
+ assert len(wf_call_kwargs) == 3
+ assert wf_call_kwargs.get('input1') == 'value1'
+ assert wf_call_kwargs.get('input2') == 'overriding-value2'
+ assert wf_call_kwargs.get('input3') == 7
+
+
+@pytest.fixture
+def service(model):
+ # sets up a service in the storage
+ service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
+ service = model.service.get(service_id)
+ return service
+
+
+def _setup_mock_workflow_in_service(request, inputs=None):
+ # sets up a mock workflow as part of the service, including uploading
+ # the workflow code to the service's dir on the resource storage
+ service = request.getfixturevalue('service')
+ resource = request.getfixturevalue('resource')
+
+ source = tests_mock.workflow.__file__
+ resource.service_template.upload(str(service.service_template.id), source)
+ mock_workflow_name = 'test_workflow'
+ arguments = {}
+ if inputs:
+ for input in inputs.itervalues():
+ arguments[input.name] = input.as_argument()
+ workflow = models.Operation(
+ name=mock_workflow_name,
+ service=service,
+ function='workflow.mock_workflow',
+ inputs=inputs or {},
+ arguments=arguments)
+ service.workflows[mock_workflow_name] = workflow
+ return mock_workflow_name
+
+
+def _get_compiler(request, workflow_name):
+ # helper method for instantiating a workflow runner
+ service = request.getfixturevalue('service')
+ model = request.getfixturevalue('model')
+ resource = request.getfixturevalue('resource')
+ plugin_manager = request.getfixturevalue('plugin_manager')
+
+ return orch_execution.ExecutionCompiler(
+ model,
+ resource,
+ plugin_manager,
+ service,
+ workflow_name
+ )
+
+
+class TestResumableWorkflows(object):
+
+ def _create_initial_workflow_runner(
+ self,
+ model,
+ resource,
+ service,
+ workflow,
+ executor,
+ inputs=None):
+
+ service.workflows['custom_workflow'] = tests_mock.models.create_operation(
+ 'custom_workflow',
+ operation_kwargs={
+ 'function': '{0}.{1}'.format(__name__, workflow.__name__),
+ 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
+ }
+ )
+ model.service.update(service)
+ compiler = orch_execution.ExecutionCompiler(
+ model, resource, None, service, 'custom_workflow'
+ )
+ execution = compiler.compile(inputs, executor)
+ model.execution.update(execution)
+
+ return orch_execution.ExecutionRunner(executor), compiler.workflow_ctx
+
+ @staticmethod
+ def _wait_for_active_and_cancel(execution_runner, ctx):
+ if custom_events['is_active'].wait(60) is False:
+ raise TimeoutError("is_active wasn't set to True")
+ execution_runner.cancel(ctx)
+ if custom_events['execution_cancelled'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ def test_resume_workflow(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_pass_first_task_only)
+ runner, ctx = self._create_initial_workflow_runner(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'number_of_tasks': 2}
+ )
+
+ wf_thread = Thread(target=runner.execute, kwargs=dict(ctx=ctx))
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ # Wait for the execution to start
+ self._wait_for_active_and_cancel(runner, ctx)
+ node = ctx.model.node.refresh(node)
+
+ tasks = ctx.model.task.list(filters={'_stub_type': None})
+ assert any(task.status == task.SUCCESS for task in tasks)
+ assert any(task.status == task.RETRYING for task in tasks)
+ custom_events['is_resumed'].set()
+ assert any(task.status == task.RETRYING for task in tasks)
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_wf_runner = orch_execution.ExecutionRunner(thread_executor, True)
+ new_wf_runner.execute(ctx)
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert all(task.status == task.SUCCESS for task in tasks)
+ assert node.attributes['invocations'].value == 3
+ assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+ def test_resume_started_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_stuck_task)
+
+ wf_runner, ctx = self._create_initial_workflow_runner(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'number_of_tasks': 1}
+ )
+
+ wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ self._wait_for_active_and_cancel(wf_runner, ctx)
+ node = workflow_context.model.node.refresh(node)
+ task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+ assert node.attributes['invocations'].value == 1
+ assert task.status == task.STARTED
+ assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING)
+ custom_events['is_resumed'].set()
+
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
+
+ new_wf_runner.execute(ctx)
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 2
+ assert task.status == task.SUCCESS
+ assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+ def test_resume_failed_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_failed_before_resuming)
+
+ wf_runner, ctx = self._create_initial_workflow_runner(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor)
+ wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ self._wait_for_active_and_cancel(wf_runner, ctx)
+ node = workflow_context.model.node.refresh(node)
+
+ task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+ assert node.attributes['invocations'].value == 2
+ assert task.status == task.STARTED
+ assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING)
+
+ custom_events['is_resumed'].set()
+ assert node.attributes['invocations'].value == 2
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
+
+ new_wf_runner.execute(ctx)
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == task.max_attempts - 1
+ assert task.status == task.SUCCESS
+ assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+ def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_pass_first_task_only)
+
+ wf_runner, ctx = self._create_initial_workflow_runner(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
+ )
+ wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ if custom_events['execution_failed'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 3
+ failed_task = [t for t in tasks if t.status == t.FAILED][0]
+
+ # First task passes
+ assert any(task.status == task.FAILED for task in tasks)
+ assert failed_task.attempts_count == 2
+ # Second task fails
+ assert any(task.status == task.SUCCESS for task in tasks)
+ assert ctx.execution.status in ctx.execution.FAILED
+
+ custom_events['is_resumed'].set()
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True, True)
+ new_wf_runner.execute(ctx)
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert failed_task.attempts_count == 1
+ assert node.attributes['invocations'].value == 4
+ assert all(task.status == task.SUCCESS for task in tasks)
+ assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+ def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_fail_first_task_only)
+
+ execution_runner, ctx = self._create_initial_workflow_runner(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_sequential_tasks_workflow,
+ thread_executor,
+ inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+ )
+ wf_thread = Thread(target=execution_runner.execute, kwargs=dict(ctx=ctx))
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ if custom_events['execution_failed'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 1
+ assert any(t.status == t.FAILED for t in tasks)
+ assert any(t.status == t.PENDING for t in tasks)
+
+ custom_events['is_resumed'].set()
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, resume=True)
+ new_wf_runner.execute(ctx)
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 2
+ assert any(t.status == t.SUCCESS for t in tasks)
+ assert any(t.status == t.FAILED for t in tasks)
+ assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+ @staticmethod
+ @pytest.fixture
+ def thread_executor():
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @staticmethod
+ @pytest.fixture
+ def workflow_context(tmpdir):
+ workflow_context = tests_mock.context.simple(str(tmpdir))
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+ @staticmethod
+ def _create_interface(ctx, node, func, arguments=None):
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if arguments:
+ # the operation has to declare the arguments before those may be passed
+ operation_kwargs['arguments'] = arguments
+ operation_name = 'create'
+ interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _engine(workflow_func, workflow_context, executor):
+ graph = workflow_func(ctx=workflow_context)
+ execution = workflow_context.execution
+ graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
+ workflow_context.execution = execution
+
+ return engine.Engine(executors={executor.__class__: executor})
+
+ @pytest.fixture(autouse=True)
+ def register_to_events(self):
+ def execution_cancelled(*args, **kwargs):
+ custom_events['execution_cancelled'].set()
+
+ def execution_failed(*args, **kwargs):
+ custom_events['execution_failed'].set()
+
+ events.on_cancelled_workflow_signal.connect(execution_cancelled)
+ events.on_failure_workflow_signal.connect(execution_failed)
+ yield
+ events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+ events.on_failure_workflow_signal.disconnect(execution_failed)
+ for event in custom_events.values():
+ event.clear()
+
+
+@workflow
+def mock_sequential_tasks_workflow(ctx, graph,
+ retry_interval=1, max_attempts=10, number_of_tasks=1):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+@workflow
+def mock_parallel_tasks_workflow(ctx, graph,
+ retry_interval=1, max_attempts=10, number_of_tasks=1):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+ return [
+ api.task.OperationTask(node,
+ 'aria.interfaces.lifecycle',
+ 'create',
+ retry_interval=retry_interval,
+ max_attempts=max_attempts)
+ for _ in xrange(number_of_tasks)
+ ]
+
+
+@operation
+def mock_failed_before_resuming(ctx):
+ """
+ The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
+ overall, the number of invocations should be ctx.task.max_attempts - 1
+ """
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] == 2:
+ custom_events['is_active'].set()
+ # unfreeze the thread only when all of the invocations are done
+ while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
+ time.sleep(5)
+
+ elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
+ # pass only just before the end.
+ return
+ else:
+ # fail o.w.
+ raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+ ctx.node.attributes['invocations'] += 1
+ while not custom_events['is_resumed'].isSet():
+ if not custom_events['is_active'].isSet():
+ custom_events['is_active'].set()
+ time.sleep(5)
+
+
+@operation
+def mock_pass_first_task_only(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] != 1:
+ custom_events['is_active'].set()
+ if not custom_events['is_resumed'].isSet():
+ # if resume was called, increase by one. o/w fail the execution - second task should
+ # fail as long it was not a part of resuming the workflow
+ raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
+ raise FailingTask("First task should fail")
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 7f33318..467ed36 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,7 +500,8 @@ if __name__ == '__main__':
arguments=arguments))
return graph
tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter
- graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+ graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(
+ tasks_graph)
eng = engine.Engine({executor.__class__: executor})
eng.execute(workflow_context)
return workflow_context.model.node.get_by_name(
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index b5df939..8992a04 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -262,7 +262,7 @@ class TestWithActualSSHServer(object):
return graph
tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter
graph_compiler.GraphCompiler(
- self._workflow_context, self._executor.__class__).compile(tasks_graph)
+ self._workflow_context.execution, self._executor.__class__).compile(tasks_graph)
eng = engine.Engine({self._executor.__class__: self._executor})
eng.execute(self._workflow_context)
return self._workflow_context.model.node.get_by_name(