You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/04/10 09:51:25 UTC
[16/18] incubator-ariatosca git commit: added tests for workflow
runner
added tests for workflow runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5b245b4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5b245b4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5b245b4a
Branch: refs/heads/logger_task
Commit: 5b245b4a6ff56f392601aa55ca6c4c5fb311bb38
Parents: e898e10
Author: Ran Ziv <ra...@gigaspaces.com>
Authored: Sun Apr 9 17:30:20 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Sun Apr 9 17:30:20 2017 +0300
----------------------------------------------------------------------
aria/modeling/exceptions.py | 2 +-
aria/modeling/utils.py | 2 +-
aria/orchestrator/workflow_runner.py | 30 +--
aria/utils/type.py | 9 +-
tests/mock/workflow.py | 26 +++
tests/orchestrator/test_workflow_runner.py | 293 ++++++++++++++++++++++++
6 files changed, 341 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/modeling/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/modeling/exceptions.py b/aria/modeling/exceptions.py
index f699560..8225f37 100644
--- a/aria/modeling/exceptions.py
+++ b/aria/modeling/exceptions.py
@@ -40,7 +40,7 @@ class MissingRequiredInputsException(ModelingException):
"""
-class InputOfWrongTypeException(ModelingException):
+class InputsOfWrongTypeException(ModelingException):
"""
ARIA modeling exception: Inputs of the wrong types have been provided
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/modeling/utils.py
----------------------------------------------------------------------
diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py
index 34c2ac7..acae065 100644
--- a/aria/modeling/utils.py
+++ b/aria/modeling/utils.py
@@ -96,7 +96,7 @@ def _merge_and_validate_inputs(inputs, template_inputs):
for param_name, param_type in wrong_type_inputs.iteritems():
error_message.write('Input "{0}" must be of type {1}\n'.
format(param_name, param_type))
- raise exceptions.InputOfWrongTypeException(error_message.getvalue())
+ raise exceptions.InputsOfWrongTypeException(error_message.getvalue())
undeclared_inputs = [input_name for input_name in inputs.keys()
if input_name not in template_inputs]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index e2ed3cf..1cdf1de 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -26,8 +26,8 @@ from .context.workflow import WorkflowContext
from .workflows.builtin import BUILTIN_WORKFLOWS, BUILTIN_WORKFLOWS_PATH_PREFIX
from .workflows.core.engine import Engine
from .workflows.executor.process import ProcessExecutor
-from ..modeling import utils as modeling_utils
from ..modeling import models
+from ..modeling import utils as modeling_utils
from ..utils.imports import import_fullname
@@ -100,8 +100,6 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- #TODO uncomment, commented for testing purposes
- # self._validate_no_active_executions()
self._engine.execute()
def cancel(self):
@@ -114,15 +112,16 @@ class WorkflowRunner(object):
workflow_name=self._workflow_name,
inputs={})
- # built-in workflows don't have any inputs, and are also
- # not a part of the service's workflows field
- if self._workflow_name not in BUILTIN_WORKFLOWS:
- workflow_inputs = {k: v for k, v in
- self.service.workflows[self._workflow_name].inputs
- if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES}
-
- execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs)
-
+ if self._workflow_name in BUILTIN_WORKFLOWS:
+ workflow_inputs = dict() # built-in workflows don't have any inputs
+ else:
+ workflow_inputs = dict((k, v) for k, v in
+ self.service.workflows[self._workflow_name].inputs.iteritems()
+ if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES)
+
+ execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs)
+ # TODO: these two following calls should execute atomically
+ self._validate_no_active_executions(execution)
self._model_storage.execution.put(execution)
return execution
@@ -133,11 +132,12 @@ class WorkflowRunner(object):
'No workflow policy {0} declared in service {1}'
.format(self._workflow_name, self.service.name))
- def _validate_no_active_executions(self):
- active_executions = [e for e in self.service.executions if e.is_active()]
+ def _validate_no_active_executions(self, execution):
+ active_executions = [e for e in self.service.executions
+ if e.id != execution.id and e.is_active()]
if active_executions:
raise exceptions.ActiveExecutionsError(
- "Can't start execution; Service {0} has a running execution with id {1}"
+ "Can't start execution; Service {0} has an active execution with id {1}"
.format(self.service.name, active_executions[0].id))
def _get_workflow_fn(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/utils/type.py
----------------------------------------------------------------------
diff --git a/aria/utils/type.py b/aria/utils/type.py
index abcf422..fff0f2a 100644
--- a/aria/utils/type.py
+++ b/aria/utils/type.py
@@ -36,11 +36,12 @@ def validate_value_type(value, type_name):
'float': float
}
- type = name_to_type.get(type_name.lower())
- if type is None:
+ type_ = name_to_type.get(type_name.lower())
+ if type_ is None:
raise RuntimeError('No supported type_name was provided')
- # validating value type - ValueError will be raised on type mismatch
- type(value)
+
+ if type(value) != type_:
+ raise ValueError('Value {0} is not of type {1}'.format(value, type_name))
def convert_value_to_type(str_value, type_name):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/tests/mock/workflow.py
----------------------------------------------------------------------
diff --git a/tests/mock/workflow.py b/tests/mock/workflow.py
new file mode 100644
index 0000000..b12b9fa
--- /dev/null
+++ b/tests/mock/workflow.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from aria.orchestrator.decorators import workflow
+
+
+@workflow
+def mock_workflow(graph, ctx, output_path=None, **kwargs): # pylint: disable=unused-argument
+ if output_path:
+ # writes call arguments to the specified output file
+ with open(output_path, 'w') as f:
+ json.dump(kwargs, f)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
new file mode 100644
index 0000000..aa89ac5
--- /dev/null
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -0,0 +1,293 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from datetime import datetime
+
+import pytest
+import mock
+
+from aria.modeling import exceptions as modeling_exceptions
+from aria.modeling import models
+from aria.orchestrator import exceptions
+from aria.orchestrator.workflow_runner import WorkflowRunner
+from aria.orchestrator.workflows.executor.process import ProcessExecutor
+
+from ..mock import (
+ topology,
+ workflow as workflow_mocks
+)
+from ..fixtures import ( # pylint: disable=unused-import
+ plugins_dir,
+ plugin_manager,
+ fs_model as model,
+ resource_storage as resource
+)
+
+
+def test_undeclared_workflow(request):
+ # validating a proper error is raised when the workflow is not declared in the service
+ with pytest.raises(exceptions.UndeclaredWorkflowError):
+ _create_workflow_runner(request, 'undeclared_workflow')
+
+
+def test_missing_workflow_implementation(service, request):
+ # validating a proper error is raised when the workflow code path does not exist
+ workflow = models.Operation(
+ name='test_workflow',
+ service=service,
+ implementation='nonexistent.workflow.implementation',
+ inputs={})
+ service.workflows['test_workflow'] = workflow
+
+ with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
+ _create_workflow_runner(request, 'test_workflow')
+
+
+def test_builtin_workflow_instantiation(request):
+ # validates the workflow runner instantiates properly when provided with a builtin workflow
+ # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+ workflow_runner = _create_workflow_runner(request, 'install')
+ tasks = list(workflow_runner._tasks_graph.tasks)
+ assert len(tasks) == 2 # expecting two WorkflowTasks
+
+
+def test_custom_workflow_instantiation(request):
+ # validates the workflow runner instantiates properly when provided with a custom workflow
+ # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+ mock_workflow = _setup_mock_workflow_in_service(request)
+ workflow_runner = _create_workflow_runner(request, mock_workflow)
+ tasks = list(workflow_runner._tasks_graph.tasks)
+ assert len(tasks) == 0 # mock workflow creates no tasks
+
+
+def test_existing_active_executions(request, service, model):
+ existing_active_execution = models.Execution(
+ service=service,
+ status=models.Execution.STARTED,
+ workflow_name='uninstall')
+ model.execution.put(existing_active_execution)
+ with pytest.raises(exceptions.ActiveExecutionsError):
+ _create_workflow_runner(request, 'install')
+
+
+def test_existing_executions_but_no_active_ones(request, service, model):
+ existing_terminated_execution = models.Execution(
+ service=service,
+ status=models.Execution.TERMINATED,
+ workflow_name='uninstall')
+ model.execution.put(existing_terminated_execution)
+ # no active executions exist, so no error should be raised
+ _create_workflow_runner(request, 'install')
+
+
+def test_default_executor(request):
+ # validates the ProcessExecutor is used by the workflow runner by default
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+ _create_workflow_runner(request, mock_workflow)
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert isinstance(engine_kwargs.get('executor'), ProcessExecutor)
+
+
+def test_custom_executor(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ custom_executor = mock.MagicMock()
+ with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+ _create_workflow_runner(request, mock_workflow, executor=custom_executor)
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert engine_kwargs.get('executor') == custom_executor
+
+
+def test_task_configuration_parameters(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ task_max_attempts = 5
+ task_retry_interval = 7
+ with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+ _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
+ task_retry_interval=task_retry_interval)
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert engine_kwargs['workflow_context']._task_max_attempts == task_max_attempts
+ assert engine_kwargs['workflow_context']._task_retry_interval == task_retry_interval
+
+
+def test_execute(request, service):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ mock_engine = mock.MagicMock()
+ with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine) \
+ as mock_engine_cls:
+ workflow_runner = _create_workflow_runner(request, mock_workflow)
+
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert engine_kwargs['workflow_context'].service.id == service.id
+ assert engine_kwargs['workflow_context'].execution.workflow_name == 'test_workflow'
+
+ workflow_runner.execute()
+ mock_engine.execute.assert_called_once_with()
+
+
+def test_cancel_execution(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ mock_engine = mock.MagicMock()
+ with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine):
+ workflow_runner = _create_workflow_runner(request, mock_workflow)
+ workflow_runner.cancel()
+ mock_engine.cancel_execution.assert_called_once_with()
+
+
+def test_execution_model_creation(request, service, model):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+ workflow_runner = _create_workflow_runner(request, mock_workflow)
+
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert engine_kwargs['workflow_context'].execution == workflow_runner.execution
+ assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
+ assert workflow_runner.execution.service.id == service.id
+ assert workflow_runner.execution.workflow_name == mock_workflow
+ assert workflow_runner.execution.created_at <= datetime.utcnow()
+ assert workflow_runner.execution.inputs == dict()
+
+
+def test_execution_inputs_override_workflow_inputs(request):
+ wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
+ mock_workflow = _setup_mock_workflow_in_service(
+ request,
+ inputs=dict((name, models.Parameter.wrap(name, val)) for name, val
+ in wf_inputs.iteritems()))
+
+ with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
+ workflow_runner = _create_workflow_runner(
+ request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7})
+
+ _, engine_kwargs = mock_engine_cls.call_args
+ assert len(workflow_runner.execution.inputs) == 3
+ # did not override input1 - expecting the default value from the workflow inputs
+ assert workflow_runner.execution.inputs['input1'].value == 'value1'
+ # overrode input2
+ assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2'
+ # overrode input of integer type
+ assert workflow_runner.execution.inputs['input3'].value == 7
+
+
+def test_execution_inputs_undeclared_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(request)
+
+ with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+ _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'})
+
+
+def test_execution_inputs_missing_required_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs={'required_input': models.Parameter.wrap('required_input', value=None)})
+
+ with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
+ _create_workflow_runner(request, mock_workflow, inputs={})
+
+
+def test_execution_inputs_wrong_type_inputs(request):
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs={'input': models.Parameter.wrap('input', 'value')})
+
+ with pytest.raises(modeling_exceptions.InputsOfWrongTypeException):
+ _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
+
+
+def test_execution_inputs_builtin_workflow_with_inputs(request):
+ # built-in workflows don't have inputs
+ with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+ _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'})
+
+
+def test_workflow_function_parameters(request, tmpdir):
+ # validating the workflow function is passed with the
+ # merged execution inputs, in dict form
+
+ # the workflow function parameters will be written to this file
+ output_path = str(tmpdir.join('output'))
+ wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
+
+ mock_workflow = _setup_mock_workflow_in_service(
+ request, inputs=dict((name, models.Parameter.wrap(name, val)) for name, val
+ in wf_inputs.iteritems()))
+
+ _create_workflow_runner(request, mock_workflow,
+ inputs={'input2': 'overriding-value2', 'input3': 7})
+
+ with open(output_path) as f:
+ wf_call_kwargs = json.load(f)
+ assert len(wf_call_kwargs) == 3
+ assert wf_call_kwargs.get('input1') == 'value1'
+ assert wf_call_kwargs.get('input2') == 'overriding-value2'
+ assert wf_call_kwargs.get('input3') == 7
+
+
+@pytest.fixture
+def service(model):
+ # sets up a service in the storage
+ service_id = topology.create_simple_topology_two_nodes(model)
+ service = model.service.get(service_id)
+ return service
+
+
+def _setup_mock_workflow_in_service(request, inputs=None):
+ # sets up a mock workflow as part of the service, including uploading
+ # the workflow code to the service's dir on the resource storage
+ service = request.getfuncargvalue('service')
+ resource = request.getfuncargvalue('resource')
+
+ source = workflow_mocks.__file__
+ resource.service_template.upload(str(service.service_template.id), source)
+ mock_workflow_name = 'test_workflow'
+ workflow = models.Operation(
+ name=mock_workflow_name,
+ service=service,
+ implementation='workflow.mock_workflow',
+ inputs=inputs or {})
+ service.workflows[mock_workflow_name] = workflow
+ return mock_workflow_name
+
+
+def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
+ task_max_attempts=None, task_retry_interval=None):
+ # helper method for instantiating a workflow runner
+ service_id = request.getfuncargvalue('service').id
+ model = request.getfuncargvalue('model')
+ resource = request.getfuncargvalue('resource')
+ plugin_manager = request.getfuncargvalue('plugin_manager')
+
+ # task configuration parameters can't be set to None, therefore only
+ # passing those if they've been set by the test
+ task_configuration_kwargs = dict()
+ if task_max_attempts is not None:
+ task_configuration_kwargs['task_max_attempts'] = task_max_attempts
+ if task_retry_interval is not None:
+ task_configuration_kwargs['task_retry_interval'] = task_retry_interval
+
+ return WorkflowRunner(
+ workflow_name=workflow_name,
+ service_id=service_id,
+ inputs=inputs or {},
+ executor=executor,
+ model_storage=model,
+ resource_storage=resource,
+ plugin_manager=plugin_manager,
+ **task_configuration_kwargs)