You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 09:45:46 UTC
[3/3] incubator-ariatosca git commit: ARIA-276 Support model
instrumentation for workflows
ARIA-276 Support model instrumentation for workflows
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4cde4d20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4cde4d20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4cde4d20
Branch: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows
Commit: 4cde4d202d820911fbf5357d9e69ef2496fe7d73
Parents: 1e883c5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 8 09:52:31 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 12:42:53 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/context/common.py | 7 +
aria/orchestrator/context/operation.py | 7 -
aria/orchestrator/decorators.py | 5 +-
aria/orchestrator/workflows/api/task.py | 2 -
aria/orchestrator/workflows/core/task.py | 12 +-
aria/storage/collection_instrumentation.py | 46 +--
.../context/test_collection_instrumentation.py | 325 -------------------
.../context/test_context_instrumentation.py | 108 ++++++
tests/orchestrator/context/test_serialize.py | 20 +-
tests/orchestrator/context/test_workflow.py | 93 ++++--
.../orchestrator/execution_plugin/test_local.py | 26 +-
tests/orchestrator/execution_plugin/test_ssh.py | 50 +--
.../workflows/builtin/test_execute_operation.py | 9 +-
.../orchestrator/workflows/core/test_engine.py | 88 +++--
.../executor/test_process_executor_extension.py | 24 +-
.../test_process_executor_tracked_changes.py | 26 +-
.../storage/test_collection_instrumentation.py | 257 +++++++++++++++
17 files changed, 627 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index c98e026..f4df317 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,6 +36,13 @@ class BaseContext(object):
Base context object for workflow and operation
"""
+ INSTRUMENTATION_FIELDS = (
+ modeling.models.Node.attributes,
+ modeling.models.Node.properties,
+ modeling.models.NodeTemplate.attributes,
+ modeling.models.NodeTemplate.properties
+ )
+
class PrefixedLogger(object):
def __init__(self, base_logger, task_id=None):
self._logger = base_logger
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index af7220d..efdc04d 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -29,13 +29,6 @@ class BaseOperationContext(common.BaseContext):
Context object used during operation creation and execution
"""
- INSTRUMENTATION_FIELDS = (
- aria.modeling.models.Node.attributes,
- aria.modeling.models.Node.properties,
- aria.modeling.models.NodeTemplate.attributes,
- aria.modeling.models.NodeTemplate.properties
- )
-
def __init__(self, task_id, actor_id, **kwargs):
self._task_id = task_id
self._actor_id = actor_id
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index 80f6962..389bfb8 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -49,8 +49,9 @@ def workflow(func=None, suffix_template=''):
workflow_parameters.setdefault('ctx', ctx)
workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
validate_function_arguments(func, workflow_parameters)
- with context.workflow.current.push(ctx):
- func(**workflow_parameters)
+ with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
+ with context.workflow.current.push(ctx):
+ func(**workflow_parameters)
return workflow_parameters['graph']
return _wrapper
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index bcba56e..ca125a8 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -108,8 +108,6 @@ class OperationTask(BaseTask):
``interface_name`` and ``operation_name`` to not refer to an operation on the actor
"""
- assert isinstance(actor, (models.Node, models.Relationship))
-
# Creating OperationTask directly should raise an error when there is no
# interface/operation.
if not has_operation(actor, interface_name, operation_name):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 72d83ea..d732f09 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -124,20 +124,22 @@ class OperationTask(BaseTask):
self.operation_name = api_task.operation_name
model_storage = api_task._workflow_context.model
+ actor = getattr(api_task.actor, '_wrapped', api_task.actor)
+
base_task_model = model_storage.task.model_cls
- if isinstance(api_task.actor, models.Node):
+ if isinstance(actor, models.Node):
context_cls = operation_context.NodeOperationContext
create_task_model = base_task_model.for_node
- elif isinstance(api_task.actor, models.Relationship):
+ elif isinstance(actor, models.Relationship):
context_cls = operation_context.RelationshipOperationContext
create_task_model = base_task_model.for_relationship
else:
raise RuntimeError('No operation context could be created for {actor.model_cls}'
- .format(actor=api_task.actor))
+ .format(actor=actor))
task_model = create_task_model(
name=api_task.name,
- actor=api_task.actor,
+ actor=actor,
status=base_task_model.PENDING,
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
@@ -156,7 +158,7 @@ class OperationTask(BaseTask):
resource_storage=self._workflow_context.resource,
service_id=self._workflow_context._service_id,
task_id=task_model.id,
- actor_id=api_task.actor.id,
+ actor_id=actor.id,
execution_id=self._workflow_context._execution_id,
workdir=self._workflow_context._workdir)
self._task_id = task_model.id
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/aria/storage/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py
index 27d8322..454f97a 100644
--- a/aria/storage/collection_instrumentation.py
+++ b/aria/storage/collection_instrumentation.py
@@ -198,23 +198,28 @@ class _InstrumentedList(_InstrumentedCollection, list):
return list(self)
-class _InstrumentedModel(object):
+class _WrappedBase(object):
- def __init__(self, original_model, mapi, instrumentation):
+ def __init__(self, wrapped, instrumentation):
+ self._wrapped = wrapped
+ self._instrumentation = instrumentation
+
+
+class _InstrumentedModel(_WrappedBase):
+
+ def __init__(self, mapi, *args, **kwargs):
"""
The original model
- :param original_model: the model to be instrumented
+ :param wrapped: the model to be instrumented
:param mapi: the mapi for that model
"""
- super(_InstrumentedModel, self).__init__()
- self._original_model = original_model
+ super(_InstrumentedModel, self).__init__(*args, **kwargs)
self._mapi = mapi
- self._instrumentation = instrumentation
self._apply_instrumentation()
def __getattr__(self, item):
- return_value = getattr(self._original_model, item)
- if isinstance(return_value, self._original_model.__class__):
+ return_value = getattr(self._wrapped, item)
+ if isinstance(return_value, self._wrapped.__class__):
return _create_instrumented_model(return_value, self._mapi, self._instrumentation)
if isinstance(return_value, (list, dict)):
return _create_wrapped_model(return_value, self._mapi, self._instrumentation)
@@ -224,7 +229,7 @@ class _InstrumentedModel(object):
for field in self._instrumentation:
field_name = field.key
field_cls = field.mapper.class_
- field = getattr(self._original_model, field_name)
+ field = getattr(self._wrapped, field_name)
# Preserve the original value. e.g. original attributes would be located under
# _attributes
@@ -241,20 +246,20 @@ class _InstrumentedModel(object):
"ARIA supports instrumentation for dict and list. Field {field} of the "
"class {model} is of {type} type.".format(
field=field,
- model=self._original_model,
+ model=self._wrapped,
type=type(field)))
instrumented_class = instrumentation_cls(seq=field,
- parent=self._original_model,
+ parent=self._wrapped,
mapi=self._mapi,
field_name=field_name,
field_cls=field_cls)
setattr(self, field_name, instrumented_class)
-class _WrappedModel(object):
+class _WrappedModel(_WrappedBase):
- def __init__(self, wrapped, instrumentation, **kwargs):
+ def __init__(self, instrumentation_kwargs, *args, **kwargs):
"""
:param instrumented_cls: The class to be instrumented
@@ -262,9 +267,8 @@ class _WrappedModel(object):
:param wrapped: the currently wrapped instance
:param kwargs: and kwargs to the passed to the instrumented class.
"""
- self._kwargs = kwargs
- self._instrumentation = instrumentation
- self._wrapped = wrapped
+ super(_WrappedModel, self).__init__(*args, **kwargs)
+ self._kwargs = instrumentation_kwargs
def _wrap(self, value):
if value.__class__ in (class_.class_ for class_ in self._instrumentation):
@@ -286,16 +290,18 @@ class _WrappedModel(object):
return self._wrap(self._wrapped[item])
-def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs):
+def _create_instrumented_model(original_model, mapi, instrumentation):
return type('Instrumented{0}'.format(original_model.__class__.__name__),
(_InstrumentedModel,),
- {})(original_model, mapi, instrumentation, **kwargs)
+ {})(wrapped=original_model, instrumentation=instrumentation, mapi=mapi)
-def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs):
+def _create_wrapped_model(original_model, mapi, instrumentation):
return type('Wrapped{0}'.format(original_model.__class__.__name__),
(_WrappedModel, ),
- {})(original_model, instrumentation, mapi=mapi, **kwargs)
+ {})(wrapped=original_model,
+ instrumentation=instrumentation,
+ instrumentation_kwargs=dict(mapi=mapi))
def instrument(instrumentation, original_model, mapi):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/context/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py
deleted file mode 100644
index ae3e8ac..0000000
--- a/tests/orchestrator/context/test_collection_instrumentation.py
+++ /dev/null
@@ -1,325 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-
-from aria.modeling import models
-from aria.storage import collection_instrumentation
-from aria.orchestrator.context import operation
-
-from tests import (
- mock,
- storage
-)
-
-
-class MockActor(object):
- def __init__(self):
- self.dict_ = {}
- self.list_ = []
-
-
-class MockMAPI(object):
-
- def __init__(self):
- pass
-
- def put(self, *args, **kwargs):
- pass
-
- def update(self, *args, **kwargs):
- pass
-
-
-class CollectionInstrumentation(object):
-
- @pytest.fixture
- def actor(self):
- return MockActor()
-
- @pytest.fixture
- def model(self):
- return MockMAPI()
-
- @pytest.fixture
- def dict_(self, actor, model):
- return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute)
-
- @pytest.fixture
- def list_(self, actor, model):
- return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute)
-
-
-class TestDict(CollectionInstrumentation):
-
- def test_keys(self, actor, dict_):
- dict_.update(
- {
- 'key1': models.Attribute.wrap('key1', 'value1'),
- 'key2': models.Attribute.wrap('key2', 'value2')
- }
- )
- assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
-
- def test_values(self, actor, dict_):
- dict_.update({
- 'key1': models.Attribute.wrap('key1', 'value1'),
- 'key2': models.Attribute.wrap('key1', 'value2')
- })
- assert (sorted(dict_.values()) ==
- sorted(['value1', 'value2']) ==
- sorted(v.value for v in actor.dict_.values()))
-
- def test_items(self, dict_):
- dict_.update({
- 'key1': models.Attribute.wrap('key1', 'value1'),
- 'key2': models.Attribute.wrap('key1', 'value2')
- })
- assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
-
- def test_iter(self, actor, dict_):
- dict_.update({
- 'key1': models.Attribute.wrap('key1', 'value1'),
- 'key2': models.Attribute.wrap('key1', 'value2')
- })
- assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
-
- def test_bool(self, dict_):
- assert not dict_
- dict_.update({
- 'key1': models.Attribute.wrap('key1', 'value1'),
- 'key2': models.Attribute.wrap('key1', 'value2')
- })
- assert dict_
-
- def test_set_item(self, actor, dict_):
- dict_['key1'] = models.Attribute.wrap('key1', 'value1')
- assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
- assert isinstance(actor.dict_['key1'], models.Attribute)
-
- def test_nested(self, actor, dict_):
- dict_['key'] = {}
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert dict_['key'] == actor.dict_['key'].value == {}
-
- dict_['key']['inner_key'] = 'value'
-
- assert len(dict_) == 1
- assert 'inner_key' in dict_['key']
- assert dict_['key']['inner_key'] == 'value'
- assert dict_['key'].keys() == ['inner_key']
- assert dict_['key'].values() == ['value']
- assert dict_['key'].items() == [('inner_key', 'value')]
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
-
- dict_['key'].update({'updated_key': 'updated_value'})
- assert len(dict_) == 1
- assert 'updated_key' in dict_['key']
- assert dict_['key']['updated_key'] == 'updated_value'
- assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
- assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
- assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
- ('updated_key', 'updated_value')])
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
-
- dict_.update({'key': 'override_value'})
- assert len(dict_) == 1
- assert 'key' in dict_
- assert dict_['key'] == 'override_value'
- assert len(actor.dict_) == 1
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert actor.dict_['key'].value == 'override_value'
-
- def test_get_item(self, actor, dict_):
- dict_['key1'] = models.Attribute.wrap('key1', 'value1')
- assert isinstance(actor.dict_['key1'], models.Attribute)
-
- def test_update(self, actor, dict_):
- dict_['key1'] = 'value1'
-
- new_dict = {'key2': 'value2'}
- dict_.update(new_dict)
- assert len(dict_) == 2
- assert dict_['key2'] == 'value2'
- assert isinstance(actor.dict_['key2'], models.Attribute)
-
- new_dict = {}
- new_dict.update(dict_)
- assert new_dict['key1'] == dict_['key1']
-
- def test_copy(self, dict_):
- dict_['key1'] = 'value1'
-
- new_dict = dict_.copy()
- assert new_dict is not dict_
- assert new_dict == dict_
-
- dict_['key1'] = 'value2'
- assert new_dict['key1'] == 'value1'
- assert dict_['key1'] == 'value2'
-
- def test_clear(self, dict_):
- dict_['key1'] = 'value1'
- dict_.clear()
-
- assert len(dict_) == 0
-
-
-class TestList(CollectionInstrumentation):
-
- def test_append(self, actor, list_):
- list_.append(models.Attribute.wrap('name', 'value1'))
- list_.append('value2')
- assert len(actor.list_) == 2
- assert len(list_) == 2
- assert isinstance(actor.list_[0], models.Attribute)
- assert list_[0] == 'value1'
-
- assert isinstance(actor.list_[1], models.Attribute)
- assert list_[1] == 'value2'
-
- list_[0] = 'new_value1'
- list_[1] = 'new_value2'
- assert isinstance(actor.list_[1], models.Attribute)
- assert isinstance(actor.list_[1], models.Attribute)
- assert list_[0] == 'new_value1'
- assert list_[1] == 'new_value2'
-
- def test_iter(self, list_):
- list_.append('value1')
- list_.append('value2')
- assert sorted(list_) == sorted(['value1', 'value2'])
-
- def test_insert(self, actor, list_):
- list_.append('value1')
- list_.insert(0, 'value2')
- list_.insert(2, 'value3')
- list_.insert(10, 'value4')
- assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
- assert len(actor.list_) == 4
-
- def test_set(self, list_):
- list_.append('value1')
- list_.append('value2')
-
- list_[1] = 'value3'
- assert len(list_) == 2
- assert sorted(list_) == sorted(['value1', 'value3'])
-
- def test_insert_into_nested(self, actor, list_):
- list_.append([])
-
- list_[0].append('inner_item')
- assert isinstance(actor.list_[0], models.Attribute)
- assert len(list_) == 1
- assert list_[0][0] == 'inner_item'
-
- list_[0].append('new_item')
- assert isinstance(actor.list_[0], models.Attribute)
- assert len(list_) == 1
- assert list_[0][1] == 'new_item'
-
- assert list_[0] == ['inner_item', 'new_item']
- assert ['inner_item', 'new_item'] == list_[0]
-
-
-class TestDictList(CollectionInstrumentation):
- def test_dict_in_list(self, actor, list_):
- list_.append({})
- assert len(list_) == 1
- assert isinstance(actor.list_[0], models.Attribute)
- assert actor.list_[0].value == {}
-
- list_[0]['key'] = 'value'
- assert list_[0]['key'] == 'value'
- assert len(actor.list_) == 1
- assert isinstance(actor.list_[0], models.Attribute)
- assert actor.list_[0].value['key'] == 'value'
-
- def test_list_in_dict(self, actor, dict_):
- dict_['key'] = []
- assert len(dict_) == 1
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert actor.dict_['key'].value == []
-
- dict_['key'].append('value')
- assert dict_['key'][0] == 'value'
- assert len(actor.dict_) == 1
- assert isinstance(actor.dict_['key'], models.Attribute)
- assert actor.dict_['key'].value[0] == 'value'
-
-
-class TestModelInstrumentation(object):
-
- @pytest.fixture
- def workflow_ctx(self, tmpdir):
- context = mock.context.simple(str(tmpdir), inmemory=True)
- yield context
- storage.release_sqlite_storage(context.model)
-
- def test_attributes_access(self, workflow_ctx):
- node = workflow_ctx.model.node.list()[0]
- task = models.Task(node=node)
- workflow_ctx.model.task.put(task)
-
- ctx = operation.NodeOperationContext(
- task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id,
- model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource,
- execution_id=1)
-
- def _run_assertions(is_under_ctx):
- def ctx_assert(expr):
- if is_under_ctx:
- assert expr
- else:
- assert not expr
-
- ctx_assert(isinstance(ctx.node.attributes,
- collection_instrumentation._InstrumentedDict))
- assert not isinstance(ctx.node.properties,
- collection_instrumentation._InstrumentedCollection)
-
- for rel in ctx.node.inbound_relationships:
- ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel))
- ctx_assert(isinstance(rel.source_node.attributes,
- collection_instrumentation._InstrumentedDict))
- ctx_assert(isinstance(rel.target_node.attributes,
- collection_instrumentation._InstrumentedDict))
-
- for node in ctx.model.node:
- ctx_assert(isinstance(node.attributes,
- collection_instrumentation._InstrumentedDict))
- assert not isinstance(node.properties,
- collection_instrumentation._InstrumentedCollection)
-
- for rel in ctx.model.relationship:
- ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel))
-
- ctx_assert(isinstance(rel.source_node.attributes,
- collection_instrumentation._InstrumentedDict))
- ctx_assert(isinstance(rel.target_node.attributes,
- collection_instrumentation._InstrumentedDict))
-
- assert not isinstance(rel.source_node.properties,
- collection_instrumentation._InstrumentedCollection)
- assert not isinstance(rel.target_node.properties,
- collection_instrumentation._InstrumentedCollection)
-
- with ctx.model.instrument(models.Node.attributes):
- _run_assertions(True)
-
- _run_assertions(False)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/context/test_context_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_context_instrumentation.py b/tests/orchestrator/context/test_context_instrumentation.py
new file mode 100644
index 0000000..6cc8096
--- /dev/null
+++ b/tests/orchestrator/context/test_context_instrumentation.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling import models
+from aria.storage import collection_instrumentation
+from aria.orchestrator.context import operation
+
+from tests import (
+ mock,
+ storage
+)
+
+
+class TestContextInstrumentation(object):
+
+ @pytest.fixture
+ def workflow_ctx(self, tmpdir):
+ context = mock.context.simple(str(tmpdir), inmemory=True)
+ yield context
+ storage.release_sqlite_storage(context.model)
+
+ def test_workflow_context_instrumentation(self, workflow_ctx):
+ with workflow_ctx.model.instrument(models.Node.attributes):
+ self._run_common_assertions(workflow_ctx, True)
+ self._run_common_assertions(workflow_ctx, False)
+
+ def test_operation_context_instrumentation(self, workflow_ctx):
+ node = workflow_ctx.model.node.list()[0]
+ task = models.Task(node=node)
+ workflow_ctx.model.task.put(task)
+
+ ctx = operation.NodeOperationContext(
+ task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id,
+ model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource,
+ execution_id=1)
+
+ with ctx.model.instrument(models.Node.attributes):
+ self._run_op_assertions(ctx, True)
+ self._run_common_assertions(ctx, True)
+
+ self._run_op_assertions(ctx, False)
+ self._run_common_assertions(ctx, False)
+
+ @staticmethod
+ def ctx_assert(expr, is_under_ctx):
+ if is_under_ctx:
+ assert expr
+ else:
+ assert not expr
+
+ def _run_op_assertions(self, ctx, is_under_ctx):
+ self.ctx_assert(isinstance(ctx.node.attributes,
+ collection_instrumentation._InstrumentedDict), is_under_ctx)
+ assert not isinstance(ctx.node.properties,
+ collection_instrumentation._InstrumentedCollection)
+
+ for rel in ctx.node.inbound_relationships:
+ self.ctx_assert(
+ isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx)
+ self.ctx_assert(
+ isinstance(rel.source_node.attributes,
+ collection_instrumentation._InstrumentedDict),
+ is_under_ctx)
+ self.ctx_assert(
+ isinstance(rel.target_node.attributes,
+ collection_instrumentation._InstrumentedDict),
+ is_under_ctx)
+
+ def _run_common_assertions(self, ctx, is_under_ctx):
+
+ for node in ctx.model.node:
+ self.ctx_assert(
+ isinstance(node.attributes, collection_instrumentation._InstrumentedDict),
+ is_under_ctx)
+ assert not isinstance(node.properties,
+ collection_instrumentation._InstrumentedCollection)
+
+ for rel in ctx.model.relationship:
+ self.ctx_assert(
+ isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx)
+
+ self.ctx_assert(
+ isinstance(rel.source_node.attributes,
+ collection_instrumentation._InstrumentedDict),
+ is_under_ctx)
+ self.ctx_assert(
+ isinstance(rel.target_node.attributes,
+ collection_instrumentation._InstrumentedDict),
+ is_under_ctx)
+
+ assert not isinstance(rel.source_node.properties,
+ collection_instrumentation._InstrumentedCollection)
+ assert not isinstance(rel.target_node.properties,
+ collection_instrumentation._InstrumentedCollection)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 4db7bf4..0919e81 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -33,16 +33,10 @@ def test_serialize_operation_context(context, executor, tmpdir):
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
- graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
- eng.execute()
-
-@workflow
-def _mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
plugin = mock.models.create_plugin()
- ctx.model.plugin.put(plugin)
+ context.model.plugin.put(plugin)
interface = mock.models.create_interface(
node.service,
'test',
@@ -51,6 +45,16 @@ def _mock_workflow(ctx, graph):
plugin=plugin)
)
node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+ graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
+ eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ eng.execute()
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
task = api.task.OperationTask(node, interface_name='test', operation_name='op')
graph.add_tasks(task)
return graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py
index 3c35435..6d53c2a 100644
--- a/tests/orchestrator/context/test_workflow.py
+++ b/tests/orchestrator/context/test_workflow.py
@@ -17,11 +17,14 @@ from datetime import datetime
import pytest
-from aria import application_model_storage
+from aria import application_model_storage, workflow
from aria.orchestrator import context
from aria.storage import sql_mapi
-from tests import storage as test_storage
-from tests.mock import models
+from aria.orchestrator.workflows.executor import thread, process
+
+from tests import storage as test_storage, ROOT_DIR
+from ... import mock
+from . import execute
class TestWorkflowContext(object):
@@ -30,10 +33,10 @@ class TestWorkflowContext(object):
ctx = self._create_ctx(storage)
execution = storage.execution.get(ctx.execution.id) # pylint: disable=no-member
assert execution.service == storage.service.get_by_name(
- models.SERVICE_NAME)
- assert execution.workflow_name == models.WORKFLOW_NAME
+ mock.models.SERVICE_NAME)
+ assert execution.workflow_name == mock.models.WORKFLOW_NAME
assert execution.service_template == storage.service_template.get_by_name(
- models.SERVICE_TEMPLATE_NAME)
+ mock.models.SERVICE_TEMPLATE_NAME)
assert execution.status == storage.execution.model_cls.PENDING
assert execution.inputs == {}
assert execution.created_at <= datetime.utcnow()
@@ -49,27 +52,75 @@ class TestWorkflowContext(object):
:param storage:
:return WorkflowContext:
"""
- service = storage.service.get_by_name(models.SERVICE_NAME)
+ service = storage.service.get_by_name(mock.models.SERVICE_NAME)
return context.workflow.WorkflowContext(
name='simple_context',
model_storage=storage,
resource_storage=None,
service_id=service,
execution_id=storage.execution.list(filters=dict(service=service))[0].id,
- workflow_name=models.WORKFLOW_NAME,
- task_max_attempts=models.TASK_MAX_ATTEMPTS,
- task_retry_interval=models.TASK_RETRY_INTERVAL
+ workflow_name=mock.models.WORKFLOW_NAME,
+ task_max_attempts=mock.models.TASK_MAX_ATTEMPTS,
+ task_retry_interval=mock.models.TASK_RETRY_INTERVAL
)
+ @pytest.fixture
+ def storage(self):
+ workflow_storage = application_model_storage(
+ sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
+ workflow_storage.service_template.put(mock.models.create_service_template())
+ service_template = workflow_storage.service_template.get_by_name(
+ mock.models.SERVICE_TEMPLATE_NAME)
+ service = mock.models.create_service(service_template)
+ workflow_storage.service.put(service)
+ workflow_storage.execution.put(mock.models.create_execution(service))
+ yield workflow_storage
+ test_storage.release_sqlite_storage(workflow_storage)
+
+
+@pytest.fixture
+def ctx(tmpdir):
+ context = mock.context.simple(
+ str(tmpdir),
+ context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+ )
+ yield context
+ test_storage.release_sqlite_storage(context.model)
+
+
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {}),
+ (process.ProcessExecutor, {'python_path': [ROOT_DIR]}),
+])
+def executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ try:
+ yield result
+ finally:
+ result.close()
+
+
+def test_attribute_consumption(ctx, executor):
+
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value')
+ node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change')
+ ctx.model.node.update(node)
+
+ assert node.attributes['key'].value == 'value'
+ assert node.attributes['key2'].value == 'value_to_change'
+
+ @workflow
+ def basic_workflow(ctx, **_):
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+ node.attributes['new_key'] = 'new_value'
+ node.attributes['key2'] = 'changed_value'
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
-@pytest.fixture(scope='function')
-def storage():
- workflow_storage = application_model_storage(
- sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
- workflow_storage.service_template.put(models.create_service_template())
- service_template = workflow_storage.service_template.get_by_name(models.SERVICE_TEMPLATE_NAME)
- service = models.create_service(service_template)
- workflow_storage.service.put(service)
- workflow_storage.execution.put(models.create_execution(service))
- yield workflow_storage
- test_storage.release_sqlite_storage(workflow_storage)
+ assert len(node.attributes) == 3
+ assert node.attributes['key'].value == 'value'
+ assert node.attributes['new_key'].value == 'new_value'
+ assert node.attributes['key2'].value == 'changed_value'
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index d792a57..f667460 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -477,20 +477,22 @@ if __name__ == '__main__':
'input_as_env_var': env_var
})
+ node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(
+ operations.__name__,
+ operations.run_script_locally.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ workflow_context.model.node.update(node)
+
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- interface = mock.models.create_interface(
- node.service,
- 'test',
- 'op',
- operation_kwargs=dict(
- function='{0}.{1}'.format(
- operations.__name__,
- operations.run_script_locally.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
graph.add_tasks(api.task.OperationTask(
node,
interface_name='test',
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 8b326e7..8c4dd2d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -214,33 +214,33 @@ class TestWithActualSSHServer(object):
else:
operation = operations.run_script_with_ssh
+ node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ arguments = {
+ 'script_path': script_path,
+ 'fabric_env': _FABRIC_ENV,
+ 'process': process,
+ 'use_sudo': use_sudo,
+ 'custom_env_var': custom_input,
+ 'test_operation': '',
+ }
+ if hide_output:
+ arguments['hide_output'] = hide_output
+ if commands:
+ arguments['commands'] = commands
+ interface = mock.models.create_interface(
+ node.service,
+ 'test',
+ 'op',
+ operation_kwargs=dict(
+ function='{0}.{1}'.format(
+ operations.__name__,
+ operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- arguments = {
- 'script_path': script_path,
- 'fabric_env': _FABRIC_ENV,
- 'process': process,
- 'use_sudo': use_sudo,
- 'custom_env_var': custom_input,
- 'test_operation': '',
- }
- if hide_output:
- arguments['hide_output'] = hide_output
- if commands:
- arguments['commands'] = commands
- interface = mock.models.create_interface(
- node.service,
- 'test',
- 'op',
- operation_kwargs=dict(
- function='{0}.{1}'.format(
- operations.__name__,
- operation.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
-
ops = []
for test_operation in test_operations:
op_arguments = arguments.copy()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py
index 88818ca..8713e3c 100644
--- a/tests/orchestrator/workflows/builtin/test_execute_operation.py
+++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py
@@ -56,12 +56,9 @@ def test_execute_operation(ctx):
)
assert len(execute_tasks) == 1
- assert execute_tasks[0].name == task.OperationTask.NAME_FORMAT.format(
- type='node',
- name=node.name,
- interface=interface_name,
- operation=operation_name
- )
+ assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node
+ assert execute_tasks[0].operation_name == operation_name
+ assert execute_tasks[0].interface_name == interface_name
# TODO: add more scenarios
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 6d2836c..0438544 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -55,12 +55,7 @@ class BaseTest(object):
tasks_graph=graph)
@staticmethod
- def _op(ctx,
- func,
- arguments=None,
- max_attempts=None,
- retry_interval=None,
- ignore_failure=None):
+ def _create_interface(ctx, func, arguments=None):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
interface_name = 'aria.interfaces.lifecycle'
operation_kwargs = dict(function='{name}.{func.__name__}'.format(
@@ -72,6 +67,17 @@ class BaseTest(object):
interface = mock.models.create_interface(node.service, interface_name, operation_name,
operation_kwargs=operation_kwargs)
node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _op(node,
+ operation_name,
+ arguments=None,
+ max_attempts=None,
+ retry_interval=None,
+ ignore_failure=None):
return api.task.OperationTask(
node,
@@ -158,9 +164,11 @@ class TestEngine(BaseTest):
assert execution.status == models.Execution.SUCCEEDED
def test_single_task_successful_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(ctx, func=mock_success_task))
+ graph.add_tasks(self._op(node, operation_name))
self._execute(
workflow_func=mock_workflow,
workflow_context=workflow_context,
@@ -170,9 +178,11 @@ class TestEngine(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 1
def test_single_task_failed_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
+
@workflow
def mock_workflow(ctx, graph):
- graph.add_tasks(self._op(ctx, func=mock_failed_task))
+ graph.add_tasks(self._op(node, operation_name))
with pytest.raises(exceptions.ExecutorException):
self._execute(
workflow_func=mock_workflow,
@@ -187,10 +197,13 @@ class TestEngine(BaseTest):
assert execution.status == models.Execution.FAILED
def test_two_tasks_execution_order(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
- op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
+ op2 = self._op(node, operation_name, arguments={'counter': 2})
graph.sequence(op1, op2)
self._execute(
workflow_func=mock_workflow,
@@ -202,11 +215,14 @@ class TestEngine(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_ordered_task, {'counter': 1})
+
@workflow
def sub_workflow(ctx, graph):
- op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
+ op1 = self._op(node, operation_name, arguments={'counter': 1})
op2 = api.task.StubTask()
- op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+ op3 = self._op(node, operation_name, arguments={'counter': 2})
graph.sequence(op1, op2, op3)
@workflow
@@ -225,11 +241,13 @@ class TestCancel(BaseTest):
def test_cancel_started_execution(self, workflow_context, executor):
number_of_tasks = 100
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_sleep_task, {'seconds': 0.1})
@workflow
def mock_workflow(ctx, graph):
operations = (
- self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1))
+ self._op(node, operation_name, arguments=dict(seconds=0.1))
for _ in range(number_of_tasks)
)
return graph.sequence(*operations)
@@ -267,9 +285,12 @@ class TestCancel(BaseTest):
class TestRetries(BaseTest):
def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=2)
graph.add_tasks(op)
@@ -283,9 +304,12 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 2},
max_attempts=2)
graph.add_tasks(op)
@@ -300,9 +324,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=3)
graph.add_tasks(op)
@@ -316,9 +342,12 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 2},
max_attempts=3)
graph.add_tasks(op)
@@ -332,9 +361,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 3
def test_infinite_retries(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=-1)
graph.add_tasks(op)
@@ -358,9 +389,11 @@ class TestRetries(BaseTest):
executor=executor)
def _test_retry_interval(self, retry_interval, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
arguments={'failure_count': 1},
max_attempts=2,
retry_interval=retry_interval)
@@ -378,9 +411,11 @@ class TestRetries(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_ignore_failure(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_conditional_failure_task, {'failure_count': 1})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_conditional_failure_task,
+ op = self._op(node, operation_name,
ignore_failure=True,
arguments={'failure_count': 100},
max_attempts=100)
@@ -401,10 +436,12 @@ class TestTaskRetryAndAbort(BaseTest):
def test_task_retry_default_interval(self, workflow_context, executor):
default_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_retry,
+ op = self._op(node, operation_name,
arguments={'message': self.message},
retry_interval=default_retry_interval,
max_attempts=2)
@@ -425,10 +462,13 @@ class TestTaskRetryAndAbort(BaseTest):
def test_task_retry_custom_interval(self, workflow_context, executor):
default_retry_interval = 100
custom_retry_interval = 0.1
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_retry, {'message': self.message,
+ 'retry_interval': custom_retry_interval})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_retry,
+ op = self._op(node, operation_name,
arguments={'message': self.message,
'retry_interval': custom_retry_interval},
retry_interval=default_retry_interval,
@@ -449,9 +489,11 @@ class TestTaskRetryAndAbort(BaseTest):
assert global_test_holder.get('sent_task_signal_calls') == 2
def test_task_abort(self, workflow_context, executor):
+ node, _, operation_name = self._create_interface(
+ workflow_context, mock_task_abort, {'message': self.message})
@workflow
def mock_workflow(ctx, graph):
- op = self._op(ctx, func=mock_task_abort,
+ op = self._op(node, operation_name,
arguments={'message': self.message},
retry_interval=100,
max_attempts=100)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 7969457..5f0b75f 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -32,19 +32,23 @@ def test_decorate_extension(context, executor):
def get_node(ctx):
return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ node = get_node(context)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
+ arguments=arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
+
@workflow
def mock_workflow(ctx, graph):
node = get_node(ctx)
- interface_name = 'test_interface'
- operation_name = 'operation'
- interface = mock.models.create_interface(
- ctx.service,
- interface_name,
- operation_name,
- operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
- arguments=arguments)
- )
- node.interfaces[interface.name] = interface
task = api.task.OperationTask(
node,
interface_name=interface_name,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 2d80a3b..7dbcc5a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -83,20 +83,22 @@ def test_apply_tracked_changes_during_an_operation(context, executor):
def _run_workflow(context, executor, op_func, arguments=None):
+ node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ interface_name = 'test_interface'
+ operation_name = 'operation'
+ wf_arguments = arguments or {}
+ interface = mock.models.create_interface(
+ context.service,
+ interface_name,
+ operation_name,
+ operation_kwargs=dict(function=_operation_mapping(op_func),
+ arguments=wf_arguments)
+ )
+ node.interfaces[interface.name] = interface
+ context.model.node.update(node)
+
@workflow
def mock_workflow(ctx, graph):
- node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- interface_name = 'test_interface'
- operation_name = 'operation'
- wf_arguments = arguments or {}
- interface = mock.models.create_interface(
- ctx.service,
- interface_name,
- operation_name,
- operation_kwargs=dict(function=_operation_mapping(op_func),
- arguments=wf_arguments)
- )
- node.interfaces[interface.name] = interface
task = api.task.OperationTask(
node,
interface_name=interface_name,
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cde4d20/tests/storage/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_collection_instrumentation.py b/tests/storage/test_collection_instrumentation.py
new file mode 100644
index 0000000..e915421
--- /dev/null
+++ b/tests/storage/test_collection_instrumentation.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling import models
+from aria.storage import collection_instrumentation
+
+
+class MockActor(object):
+ def __init__(self):
+ self.dict_ = {}
+ self.list_ = []
+
+
+class MockMAPI(object):
+
+ def __init__(self):
+ pass
+
+ def put(self, *args, **kwargs):
+ pass
+
+ def update(self, *args, **kwargs):
+ pass
+
+
+class CollectionInstrumentation(object):
+
+ @pytest.fixture
+ def actor(self):
+ return MockActor()
+
+ @pytest.fixture
+ def model(self):
+ return MockMAPI()
+
+ @pytest.fixture
+ def dict_(self, actor, model):
+ return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute)
+
+ @pytest.fixture
+ def list_(self, actor, model):
+ return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute)
+
+
+class TestDict(CollectionInstrumentation):
+
+ def test_keys(self, actor, dict_):
+ dict_.update(
+ {
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key2', 'value2')
+ }
+ )
+ assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+ def test_values(self, actor, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert (sorted(dict_.values()) ==
+ sorted(['value1', 'value2']) ==
+ sorted(v.value for v in actor.dict_.values()))
+
+ def test_items(self, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+ def test_iter(self, actor, dict_):
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+ def test_bool(self, dict_):
+ assert not dict_
+ dict_.update({
+ 'key1': models.Attribute.wrap('key1', 'value1'),
+ 'key2': models.Attribute.wrap('key1', 'value2')
+ })
+ assert dict_
+
+ def test_set_item(self, actor, dict_):
+ dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+ assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
+ assert isinstance(actor.dict_['key1'], models.Attribute)
+
+ def test_nested(self, actor, dict_):
+ dict_['key'] = {}
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert dict_['key'] == actor.dict_['key'].value == {}
+
+ dict_['key']['inner_key'] = 'value'
+
+ assert len(dict_) == 1
+ assert 'inner_key' in dict_['key']
+ assert dict_['key']['inner_key'] == 'value'
+ assert dict_['key'].keys() == ['inner_key']
+ assert dict_['key'].values() == ['value']
+ assert dict_['key'].items() == [('inner_key', 'value')]
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+ dict_['key'].update({'updated_key': 'updated_value'})
+ assert len(dict_) == 1
+ assert 'updated_key' in dict_['key']
+ assert dict_['key']['updated_key'] == 'updated_value'
+ assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
+ assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
+ assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
+ ('updated_key', 'updated_value')])
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+ dict_.update({'key': 'override_value'})
+ assert len(dict_) == 1
+ assert 'key' in dict_
+ assert dict_['key'] == 'override_value'
+ assert len(actor.dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value == 'override_value'
+
+ def test_get_item(self, actor, dict_):
+ dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+ assert isinstance(actor.dict_['key1'], models.Attribute)
+
+ def test_update(self, actor, dict_):
+ dict_['key1'] = 'value1'
+
+ new_dict = {'key2': 'value2'}
+ dict_.update(new_dict)
+ assert len(dict_) == 2
+ assert dict_['key2'] == 'value2'
+ assert isinstance(actor.dict_['key2'], models.Attribute)
+
+ new_dict = {}
+ new_dict.update(dict_)
+ assert new_dict['key1'] == dict_['key1']
+
+ def test_copy(self, dict_):
+ dict_['key1'] = 'value1'
+
+ new_dict = dict_.copy()
+ assert new_dict is not dict_
+ assert new_dict == dict_
+
+ dict_['key1'] = 'value2'
+ assert new_dict['key1'] == 'value1'
+ assert dict_['key1'] == 'value2'
+
+ def test_clear(self, dict_):
+ dict_['key1'] = 'value1'
+ dict_.clear()
+
+ assert len(dict_) == 0
+
+
+class TestList(CollectionInstrumentation):
+
+ def test_append(self, actor, list_):
+ list_.append(models.Attribute.wrap('name', 'value1'))
+ list_.append('value2')
+ assert len(actor.list_) == 2
+ assert len(list_) == 2
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert list_[0] == 'value1'
+
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert list_[1] == 'value2'
+
+ list_[0] = 'new_value1'
+ list_[1] = 'new_value2'
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert isinstance(actor.list_[1], models.Attribute)
+ assert list_[0] == 'new_value1'
+ assert list_[1] == 'new_value2'
+
+ def test_iter(self, list_):
+ list_.append('value1')
+ list_.append('value2')
+ assert sorted(list_) == sorted(['value1', 'value2'])
+
+ def test_insert(self, actor, list_):
+ list_.append('value1')
+ list_.insert(0, 'value2')
+ list_.insert(2, 'value3')
+ list_.insert(10, 'value4')
+ assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
+ assert len(actor.list_) == 4
+
+ def test_set(self, list_):
+ list_.append('value1')
+ list_.append('value2')
+
+ list_[1] = 'value3'
+ assert len(list_) == 2
+ assert sorted(list_) == sorted(['value1', 'value3'])
+
+ def test_insert_into_nested(self, actor, list_):
+ list_.append([])
+
+ list_[0].append('inner_item')
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert len(list_) == 1
+ assert list_[0][0] == 'inner_item'
+
+ list_[0].append('new_item')
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert len(list_) == 1
+ assert list_[0][1] == 'new_item'
+
+ assert list_[0] == ['inner_item', 'new_item']
+ assert ['inner_item', 'new_item'] == list_[0]
+
+
+class TestDictList(CollectionInstrumentation):
+ def test_dict_in_list(self, actor, list_):
+ list_.append({})
+ assert len(list_) == 1
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert actor.list_[0].value == {}
+
+ list_[0]['key'] = 'value'
+ assert list_[0]['key'] == 'value'
+ assert len(actor.list_) == 1
+ assert isinstance(actor.list_[0], models.Attribute)
+ assert actor.list_[0].value['key'] == 'value'
+
+ def test_list_in_dict(self, actor, dict_):
+ dict_['key'] = []
+ assert len(dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value == []
+
+ dict_['key'].append('value')
+ assert dict_['key'][0] == 'value'
+ assert len(actor.dict_) == 1
+ assert isinstance(actor.dict_['key'], models.Attribute)
+ assert actor.dict_['key'].value[0] == 'value'