You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by em...@apache.org on 2017/06/14 17:53:43 UTC

[2/5] incubator-ariatosca git commit: ARIA-276 Support model instrumentation for workflows

ARIA-276 Support model instrumentation for workflows


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2149a5ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2149a5ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2149a5ee

Branch: refs/heads/ARIA-54-prepare-aria-packaging
Commit: 2149a5ee0c4656a253f54db20f279197961588c1
Parents: 1e883c5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 8 09:52:31 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 14:34:41 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/common.py             |   7 +
 aria/orchestrator/context/operation.py          |   7 -
 aria/orchestrator/decorators.py                 |   5 +-
 aria/orchestrator/workflows/api/task.py         |   2 -
 aria/orchestrator/workflows/core/task.py        |  12 +-
 aria/storage/collection_instrumentation.py      |  46 +--
 .../context/test_collection_instrumentation.py  | 325 -------------------
 .../context/test_context_instrumentation.py     | 108 ++++++
 tests/orchestrator/context/test_serialize.py    |  20 +-
 tests/orchestrator/context/test_workflow.py     |  93 ++++--
 .../orchestrator/execution_plugin/test_local.py |  26 +-
 tests/orchestrator/execution_plugin/test_ssh.py |  50 +--
 .../workflows/builtin/test_execute_operation.py |   9 +-
 .../orchestrator/workflows/core/test_engine.py  |  88 +++--
 .../executor/test_process_executor_extension.py |  24 +-
 .../test_process_executor_tracked_changes.py    |  26 +-
 .../storage/test_collection_instrumentation.py  | 257 +++++++++++++++
 17 files changed, 627 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index c98e026..f4df317 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -36,6 +36,13 @@ class BaseContext(object):
     Base context object for workflow and operation
     """
 
+    INSTRUMENTATION_FIELDS = (
+        modeling.models.Node.attributes,
+        modeling.models.Node.properties,
+        modeling.models.NodeTemplate.attributes,
+        modeling.models.NodeTemplate.properties
+    )
+
     class PrefixedLogger(object):
         def __init__(self, base_logger, task_id=None):
             self._logger = base_logger

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index af7220d..efdc04d 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -29,13 +29,6 @@ class BaseOperationContext(common.BaseContext):
     Context object used during operation creation and execution
     """
 
-    INSTRUMENTATION_FIELDS = (
-        aria.modeling.models.Node.attributes,
-        aria.modeling.models.Node.properties,
-        aria.modeling.models.NodeTemplate.attributes,
-        aria.modeling.models.NodeTemplate.properties
-    )
-
     def __init__(self, task_id, actor_id, **kwargs):
         self._task_id = task_id
         self._actor_id = actor_id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index 80f6962..389bfb8 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -49,8 +49,9 @@ def workflow(func=None, suffix_template=''):
         workflow_parameters.setdefault('ctx', ctx)
         workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
         validate_function_arguments(func, workflow_parameters)
-        with context.workflow.current.push(ctx):
-            func(**workflow_parameters)
+        with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS):
+            with context.workflow.current.push(ctx):
+                func(**workflow_parameters)
         return workflow_parameters['graph']
     return _wrapper
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index bcba56e..ca125a8 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -108,8 +108,6 @@ class OperationTask(BaseTask):
                 ``interface_name`` and ``operation_name`` to not refer to an operation on the actor
         """
 
-        assert isinstance(actor, (models.Node, models.Relationship))
-
         # Creating OperationTask directly should raise an error when there is no
         # interface/operation.
         if not has_operation(actor, interface_name, operation_name):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 72d83ea..d732f09 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -124,20 +124,22 @@ class OperationTask(BaseTask):
         self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
 
+        actor = getattr(api_task.actor, '_wrapped', api_task.actor)
+
         base_task_model = model_storage.task.model_cls
-        if isinstance(api_task.actor, models.Node):
+        if isinstance(actor, models.Node):
             context_cls = operation_context.NodeOperationContext
             create_task_model = base_task_model.for_node
-        elif isinstance(api_task.actor, models.Relationship):
+        elif isinstance(actor, models.Relationship):
             context_cls = operation_context.RelationshipOperationContext
             create_task_model = base_task_model.for_relationship
         else:
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
-                               .format(actor=api_task.actor))
+                               .format(actor=actor))
 
         task_model = create_task_model(
             name=api_task.name,
-            actor=api_task.actor,
+            actor=actor,
             status=base_task_model.PENDING,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
@@ -156,7 +158,7 @@ class OperationTask(BaseTask):
                                 resource_storage=self._workflow_context.resource,
                                 service_id=self._workflow_context._service_id,
                                 task_id=task_model.id,
-                                actor_id=api_task.actor.id,
+                                actor_id=actor.id,
                                 execution_id=self._workflow_context._execution_id,
                                 workdir=self._workflow_context._workdir)
         self._task_id = task_model.id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/storage/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py
index 27d8322..454f97a 100644
--- a/aria/storage/collection_instrumentation.py
+++ b/aria/storage/collection_instrumentation.py
@@ -198,23 +198,28 @@ class _InstrumentedList(_InstrumentedCollection, list):
         return list(self)
 
 
-class _InstrumentedModel(object):
+class _WrappedBase(object):
 
-    def __init__(self, original_model, mapi, instrumentation):
+    def __init__(self, wrapped, instrumentation):
+        self._wrapped = wrapped
+        self._instrumentation = instrumentation
+
+
+class _InstrumentedModel(_WrappedBase):
+
+    def __init__(self, mapi, *args, **kwargs):
         """
         The original model
-        :param original_model: the model to be instrumented
+        :param wrapped: the model to be instrumented
         :param mapi: the mapi for that model
         """
-        super(_InstrumentedModel, self).__init__()
-        self._original_model = original_model
+        super(_InstrumentedModel, self).__init__(*args, **kwargs)
         self._mapi = mapi
-        self._instrumentation = instrumentation
         self._apply_instrumentation()
 
     def __getattr__(self, item):
-        return_value = getattr(self._original_model, item)
-        if isinstance(return_value, self._original_model.__class__):
+        return_value = getattr(self._wrapped, item)
+        if isinstance(return_value, self._wrapped.__class__):
             return _create_instrumented_model(return_value, self._mapi, self._instrumentation)
         if isinstance(return_value, (list, dict)):
             return _create_wrapped_model(return_value, self._mapi, self._instrumentation)
@@ -224,7 +229,7 @@ class _InstrumentedModel(object):
         for field in self._instrumentation:
             field_name = field.key
             field_cls = field.mapper.class_
-            field = getattr(self._original_model, field_name)
+            field = getattr(self._wrapped, field_name)
 
             # Preserve the original value. e.g. original attributes would be located under
             # _attributes
@@ -241,20 +246,20 @@ class _InstrumentedModel(object):
                     "ARIA supports instrumentation for dict and list. Field {field} of the "
                     "class {model} is of {type} type.".format(
                         field=field,
-                        model=self._original_model,
+                        model=self._wrapped,
                         type=type(field)))
 
             instrumented_class = instrumentation_cls(seq=field,
-                                                     parent=self._original_model,
+                                                     parent=self._wrapped,
                                                      mapi=self._mapi,
                                                      field_name=field_name,
                                                      field_cls=field_cls)
             setattr(self, field_name, instrumented_class)
 
 
-class _WrappedModel(object):
+class _WrappedModel(_WrappedBase):
 
-    def __init__(self, wrapped, instrumentation, **kwargs):
+    def __init__(self, instrumentation_kwargs, *args, **kwargs):
         """
 
         :param instrumented_cls: The class to be instrumented
@@ -262,9 +267,8 @@ class _WrappedModel(object):
         :param wrapped: the currently wrapped instance
         :param kwargs: and kwargs to the passed to the instrumented class.
         """
-        self._kwargs = kwargs
-        self._instrumentation = instrumentation
-        self._wrapped = wrapped
+        super(_WrappedModel, self).__init__(*args, **kwargs)
+        self._kwargs = instrumentation_kwargs
 
     def _wrap(self, value):
         if value.__class__ in (class_.class_ for class_ in self._instrumentation):
@@ -286,16 +290,18 @@ class _WrappedModel(object):
         return self._wrap(self._wrapped[item])
 
 
-def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs):
+def _create_instrumented_model(original_model, mapi, instrumentation):
     return type('Instrumented{0}'.format(original_model.__class__.__name__),
                 (_InstrumentedModel,),
-                {})(original_model, mapi, instrumentation, **kwargs)
+                {})(wrapped=original_model, instrumentation=instrumentation, mapi=mapi)
 
 
-def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs):
+def _create_wrapped_model(original_model, mapi, instrumentation):
     return type('Wrapped{0}'.format(original_model.__class__.__name__),
                 (_WrappedModel, ),
-                {})(original_model, instrumentation, mapi=mapi, **kwargs)
+                {})(wrapped=original_model,
+                    instrumentation=instrumentation,
+                    instrumentation_kwargs=dict(mapi=mapi))
 
 
 def instrument(instrumentation, original_model, mapi):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py
deleted file mode 100644
index ae3e8ac..0000000
--- a/tests/orchestrator/context/test_collection_instrumentation.py
+++ /dev/null
@@ -1,325 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-
-from aria.modeling import models
-from aria.storage import collection_instrumentation
-from aria.orchestrator.context import operation
-
-from tests import (
-    mock,
-    storage
-)
-
-
-class MockActor(object):
-    def __init__(self):
-        self.dict_ = {}
-        self.list_ = []
-
-
-class MockMAPI(object):
-
-    def __init__(self):
-        pass
-
-    def put(self, *args, **kwargs):
-        pass
-
-    def update(self, *args, **kwargs):
-        pass
-
-
-class CollectionInstrumentation(object):
-
-    @pytest.fixture
-    def actor(self):
-        return MockActor()
-
-    @pytest.fixture
-    def model(self):
-        return MockMAPI()
-
-    @pytest.fixture
-    def dict_(self, actor, model):
-        return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute)
-
-    @pytest.fixture
-    def list_(self, actor, model):
-        return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute)
-
-
-class TestDict(CollectionInstrumentation):
-
-    def test_keys(self, actor, dict_):
-        dict_.update(
-            {
-                'key1': models.Attribute.wrap('key1', 'value1'),
-                'key2': models.Attribute.wrap('key2', 'value2')
-            }
-        )
-        assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
-
-    def test_values(self, actor, dict_):
-        dict_.update({
-            'key1': models.Attribute.wrap('key1', 'value1'),
-            'key2': models.Attribute.wrap('key1', 'value2')
-        })
-        assert (sorted(dict_.values()) ==
-                sorted(['value1', 'value2']) ==
-                sorted(v.value for v in actor.dict_.values()))
-
-    def test_items(self, dict_):
-        dict_.update({
-            'key1': models.Attribute.wrap('key1', 'value1'),
-            'key2': models.Attribute.wrap('key1', 'value2')
-        })
-        assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
-
-    def test_iter(self, actor, dict_):
-        dict_.update({
-            'key1': models.Attribute.wrap('key1', 'value1'),
-            'key2': models.Attribute.wrap('key1', 'value2')
-        })
-        assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
-
-    def test_bool(self, dict_):
-        assert not dict_
-        dict_.update({
-            'key1': models.Attribute.wrap('key1', 'value1'),
-            'key2': models.Attribute.wrap('key1', 'value2')
-        })
-        assert dict_
-
-    def test_set_item(self, actor, dict_):
-        dict_['key1'] = models.Attribute.wrap('key1', 'value1')
-        assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
-        assert isinstance(actor.dict_['key1'], models.Attribute)
-
-    def test_nested(self, actor, dict_):
-        dict_['key'] = {}
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert dict_['key'] == actor.dict_['key'].value == {}
-
-        dict_['key']['inner_key'] = 'value'
-
-        assert len(dict_) == 1
-        assert 'inner_key' in dict_['key']
-        assert dict_['key']['inner_key'] == 'value'
-        assert dict_['key'].keys() == ['inner_key']
-        assert dict_['key'].values() == ['value']
-        assert dict_['key'].items() == [('inner_key', 'value')]
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
-
-        dict_['key'].update({'updated_key': 'updated_value'})
-        assert len(dict_) == 1
-        assert 'updated_key' in dict_['key']
-        assert dict_['key']['updated_key'] == 'updated_value'
-        assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
-        assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
-        assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
-                                                       ('updated_key', 'updated_value')])
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
-
-        dict_.update({'key': 'override_value'})
-        assert len(dict_) == 1
-        assert 'key' in dict_
-        assert dict_['key'] == 'override_value'
-        assert len(actor.dict_) == 1
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert actor.dict_['key'].value == 'override_value'
-
-    def test_get_item(self, actor, dict_):
-        dict_['key1'] = models.Attribute.wrap('key1', 'value1')
-        assert isinstance(actor.dict_['key1'], models.Attribute)
-
-    def test_update(self, actor, dict_):
-        dict_['key1'] = 'value1'
-
-        new_dict = {'key2': 'value2'}
-        dict_.update(new_dict)
-        assert len(dict_) == 2
-        assert dict_['key2'] == 'value2'
-        assert isinstance(actor.dict_['key2'], models.Attribute)
-
-        new_dict = {}
-        new_dict.update(dict_)
-        assert new_dict['key1'] == dict_['key1']
-
-    def test_copy(self, dict_):
-        dict_['key1'] = 'value1'
-
-        new_dict = dict_.copy()
-        assert new_dict is not dict_
-        assert new_dict == dict_
-
-        dict_['key1'] = 'value2'
-        assert new_dict['key1'] == 'value1'
-        assert dict_['key1'] == 'value2'
-
-    def test_clear(self, dict_):
-        dict_['key1'] = 'value1'
-        dict_.clear()
-
-        assert len(dict_) == 0
-
-
-class TestList(CollectionInstrumentation):
-
-    def test_append(self, actor, list_):
-        list_.append(models.Attribute.wrap('name', 'value1'))
-        list_.append('value2')
-        assert len(actor.list_) == 2
-        assert len(list_) == 2
-        assert isinstance(actor.list_[0], models.Attribute)
-        assert list_[0] == 'value1'
-
-        assert isinstance(actor.list_[1], models.Attribute)
-        assert list_[1] == 'value2'
-
-        list_[0] = 'new_value1'
-        list_[1] = 'new_value2'
-        assert isinstance(actor.list_[1], models.Attribute)
-        assert isinstance(actor.list_[1], models.Attribute)
-        assert list_[0] == 'new_value1'
-        assert list_[1] == 'new_value2'
-
-    def test_iter(self, list_):
-        list_.append('value1')
-        list_.append('value2')
-        assert sorted(list_) == sorted(['value1', 'value2'])
-
-    def test_insert(self, actor, list_):
-        list_.append('value1')
-        list_.insert(0, 'value2')
-        list_.insert(2, 'value3')
-        list_.insert(10, 'value4')
-        assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
-        assert len(actor.list_) == 4
-
-    def test_set(self, list_):
-        list_.append('value1')
-        list_.append('value2')
-
-        list_[1] = 'value3'
-        assert len(list_) == 2
-        assert sorted(list_) == sorted(['value1', 'value3'])
-
-    def test_insert_into_nested(self, actor, list_):
-        list_.append([])
-
-        list_[0].append('inner_item')
-        assert isinstance(actor.list_[0], models.Attribute)
-        assert len(list_) == 1
-        assert list_[0][0] == 'inner_item'
-
-        list_[0].append('new_item')
-        assert isinstance(actor.list_[0], models.Attribute)
-        assert len(list_) == 1
-        assert list_[0][1] == 'new_item'
-
-        assert list_[0] == ['inner_item', 'new_item']
-        assert ['inner_item', 'new_item'] == list_[0]
-
-
-class TestDictList(CollectionInstrumentation):
-    def test_dict_in_list(self, actor, list_):
-        list_.append({})
-        assert len(list_) == 1
-        assert isinstance(actor.list_[0], models.Attribute)
-        assert actor.list_[0].value == {}
-
-        list_[0]['key'] = 'value'
-        assert list_[0]['key'] == 'value'
-        assert len(actor.list_) == 1
-        assert isinstance(actor.list_[0], models.Attribute)
-        assert actor.list_[0].value['key'] == 'value'
-
-    def test_list_in_dict(self, actor, dict_):
-        dict_['key'] = []
-        assert len(dict_) == 1
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert actor.dict_['key'].value == []
-
-        dict_['key'].append('value')
-        assert dict_['key'][0] == 'value'
-        assert len(actor.dict_) == 1
-        assert isinstance(actor.dict_['key'], models.Attribute)
-        assert actor.dict_['key'].value[0] == 'value'
-
-
-class TestModelInstrumentation(object):
-
-    @pytest.fixture
-    def workflow_ctx(self, tmpdir):
-        context = mock.context.simple(str(tmpdir), inmemory=True)
-        yield context
-        storage.release_sqlite_storage(context.model)
-
-    def test_attributes_access(self, workflow_ctx):
-        node = workflow_ctx.model.node.list()[0]
-        task = models.Task(node=node)
-        workflow_ctx.model.task.put(task)
-
-        ctx = operation.NodeOperationContext(
-            task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id,
-            model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource,
-            execution_id=1)
-
-        def _run_assertions(is_under_ctx):
-            def ctx_assert(expr):
-                if is_under_ctx:
-                    assert expr
-                else:
-                    assert not expr
-
-            ctx_assert(isinstance(ctx.node.attributes,
-                                  collection_instrumentation._InstrumentedDict))
-            assert not isinstance(ctx.node.properties,
-                                  collection_instrumentation._InstrumentedCollection)
-
-            for rel in ctx.node.inbound_relationships:
-                ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel))
-                ctx_assert(isinstance(rel.source_node.attributes,
-                                      collection_instrumentation._InstrumentedDict))
-                ctx_assert(isinstance(rel.target_node.attributes,
-                                      collection_instrumentation._InstrumentedDict))
-
-            for node in ctx.model.node:
-                ctx_assert(isinstance(node.attributes,
-                                      collection_instrumentation._InstrumentedDict))
-                assert not isinstance(node.properties,
-                                      collection_instrumentation._InstrumentedCollection)
-
-            for rel in ctx.model.relationship:
-                ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel))
-
-                ctx_assert(isinstance(rel.source_node.attributes,
-                                      collection_instrumentation._InstrumentedDict))
-                ctx_assert(isinstance(rel.target_node.attributes,
-                                      collection_instrumentation._InstrumentedDict))
-
-                assert not isinstance(rel.source_node.properties,
-                                      collection_instrumentation._InstrumentedCollection)
-                assert not isinstance(rel.target_node.properties,
-                                      collection_instrumentation._InstrumentedCollection)
-
-        with ctx.model.instrument(models.Node.attributes):
-            _run_assertions(True)
-
-        _run_assertions(False)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_context_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_context_instrumentation.py b/tests/orchestrator/context/test_context_instrumentation.py
new file mode 100644
index 0000000..6cc8096
--- /dev/null
+++ b/tests/orchestrator/context/test_context_instrumentation.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling import models
+from aria.storage import collection_instrumentation
+from aria.orchestrator.context import operation
+
+from tests import (
+    mock,
+    storage
+)
+
+
+class TestContextInstrumentation(object):
+
+    @pytest.fixture
+    def workflow_ctx(self, tmpdir):
+        context = mock.context.simple(str(tmpdir), inmemory=True)
+        yield context
+        storage.release_sqlite_storage(context.model)
+
+    def test_workflow_context_instrumentation(self, workflow_ctx):
+        with workflow_ctx.model.instrument(models.Node.attributes):
+            self._run_common_assertions(workflow_ctx, True)
+        self._run_common_assertions(workflow_ctx, False)
+
+    def test_operation_context_instrumentation(self, workflow_ctx):
+        node = workflow_ctx.model.node.list()[0]
+        task = models.Task(node=node)
+        workflow_ctx.model.task.put(task)
+
+        ctx = operation.NodeOperationContext(
+            task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id,
+            model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource,
+            execution_id=1)
+
+        with ctx.model.instrument(models.Node.attributes):
+            self._run_op_assertions(ctx, True)
+            self._run_common_assertions(ctx, True)
+
+        self._run_op_assertions(ctx, False)
+        self._run_common_assertions(ctx, False)
+
+    @staticmethod
+    def ctx_assert(expr, is_under_ctx):
+        if is_under_ctx:
+            assert expr
+        else:
+            assert not expr
+
+    def _run_op_assertions(self, ctx, is_under_ctx):
+        self.ctx_assert(isinstance(ctx.node.attributes,
+                                   collection_instrumentation._InstrumentedDict), is_under_ctx)
+        assert not isinstance(ctx.node.properties,
+                              collection_instrumentation._InstrumentedCollection)
+
+        for rel in ctx.node.inbound_relationships:
+            self.ctx_assert(
+                isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx)
+            self.ctx_assert(
+                isinstance(rel.source_node.attributes,
+                           collection_instrumentation._InstrumentedDict),
+                is_under_ctx)
+            self.ctx_assert(
+                isinstance(rel.target_node.attributes,
+                           collection_instrumentation._InstrumentedDict),
+                is_under_ctx)
+
+    def _run_common_assertions(self, ctx, is_under_ctx):
+
+        for node in ctx.model.node:
+            self.ctx_assert(
+                isinstance(node.attributes, collection_instrumentation._InstrumentedDict),
+                is_under_ctx)
+            assert not isinstance(node.properties,
+                                  collection_instrumentation._InstrumentedCollection)
+
+        for rel in ctx.model.relationship:
+            self.ctx_assert(
+                isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx)
+
+            self.ctx_assert(
+                isinstance(rel.source_node.attributes,
+                           collection_instrumentation._InstrumentedDict),
+                is_under_ctx)
+            self.ctx_assert(
+                isinstance(rel.target_node.attributes,
+                           collection_instrumentation._InstrumentedDict),
+                is_under_ctx)
+
+            assert not isinstance(rel.source_node.properties,
+                                  collection_instrumentation._InstrumentedCollection)
+            assert not isinstance(rel.target_node.properties,
+                                  collection_instrumentation._InstrumentedCollection)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 4db7bf4..0919e81 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -33,16 +33,10 @@ def test_serialize_operation_context(context, executor, tmpdir):
     test_file.write(TEST_FILE_CONTENT)
     resource = context.resource
     resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
-    graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
-    eng.execute()
-
 
-@workflow
-def _mock_workflow(ctx, graph):
-    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     plugin = mock.models.create_plugin()
-    ctx.model.plugin.put(plugin)
+    context.model.plugin.put(plugin)
     interface = mock.models.create_interface(
         node.service,
         'test',
@@ -51,6 +45,16 @@ def _mock_workflow(ctx, graph):
                               plugin=plugin)
     )
     node.interfaces[interface.name] = interface
+    context.model.node.update(node)
+
+    graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
+    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    eng.execute()
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     task = api.task.OperationTask(node, interface_name='test', operation_name='op')
     graph.add_tasks(task)
     return graph

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py
index 3c35435..6d53c2a 100644
--- a/tests/orchestrator/context/test_workflow.py
+++ b/tests/orchestrator/context/test_workflow.py
@@ -17,11 +17,14 @@ from datetime import datetime
 
 import pytest
 
-from aria import application_model_storage
+from aria import application_model_storage, workflow
 from aria.orchestrator import context
 from aria.storage import sql_mapi
-from tests import storage as test_storage
-from tests.mock import models
+from aria.orchestrator.workflows.executor import thread, process
+
+from tests import storage as test_storage, ROOT_DIR
+from ... import mock
+from . import execute
 
 
 class TestWorkflowContext(object):
@@ -30,10 +33,10 @@ class TestWorkflowContext(object):
         ctx = self._create_ctx(storage)
         execution = storage.execution.get(ctx.execution.id)             # pylint: disable=no-member
         assert execution.service == storage.service.get_by_name(
-            models.SERVICE_NAME)
-        assert execution.workflow_name == models.WORKFLOW_NAME
+            mock.models.SERVICE_NAME)
+        assert execution.workflow_name == mock.models.WORKFLOW_NAME
         assert execution.service_template == storage.service_template.get_by_name(
-            models.SERVICE_TEMPLATE_NAME)
+            mock.models.SERVICE_TEMPLATE_NAME)
         assert execution.status == storage.execution.model_cls.PENDING
         assert execution.inputs == {}
         assert execution.created_at <= datetime.utcnow()
@@ -49,27 +52,75 @@ class TestWorkflowContext(object):
         :param storage:
         :return WorkflowContext:
         """
-        service = storage.service.get_by_name(models.SERVICE_NAME)
+        service = storage.service.get_by_name(mock.models.SERVICE_NAME)
         return context.workflow.WorkflowContext(
             name='simple_context',
             model_storage=storage,
             resource_storage=None,
             service_id=service,
             execution_id=storage.execution.list(filters=dict(service=service))[0].id,
-            workflow_name=models.WORKFLOW_NAME,
-            task_max_attempts=models.TASK_MAX_ATTEMPTS,
-            task_retry_interval=models.TASK_RETRY_INTERVAL
+            workflow_name=mock.models.WORKFLOW_NAME,
+            task_max_attempts=mock.models.TASK_MAX_ATTEMPTS,
+            task_retry_interval=mock.models.TASK_RETRY_INTERVAL
         )
 
+    @pytest.fixture
+    def storage(self):
+        workflow_storage = application_model_storage(
+            sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
+        workflow_storage.service_template.put(mock.models.create_service_template())
+        service_template = workflow_storage.service_template.get_by_name(
+            mock.models.SERVICE_TEMPLATE_NAME)
+        service = mock.models.create_service(service_template)
+        workflow_storage.service.put(service)
+        workflow_storage.execution.put(mock.models.create_execution(service))
+        yield workflow_storage
+        test_storage.release_sqlite_storage(workflow_storage)
+
+
+@pytest.fixture
+def ctx(tmpdir):
+    context = mock.context.simple(
+        str(tmpdir),
+        context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
+    )
+    yield context
+    test_storage.release_sqlite_storage(context.model)
+
+
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {}),
+    (process.ProcessExecutor, {'python_path': [ROOT_DIR]}),
+])
+def executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+def test_attribute_consumption(ctx, executor):
+
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+    node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value')
+    node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change')
+    ctx.model.node.update(node)
+
+    assert node.attributes['key'].value == 'value'
+    assert node.attributes['key2'].value == 'value_to_change'
+
+    @workflow
+    def basic_workflow(ctx, **_):
+        node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+        node.attributes['new_key'] = 'new_value'
+        node.attributes['key2'] = 'changed_value'
+
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
 
-@pytest.fixture(scope='function')
-def storage():
-    workflow_storage = application_model_storage(
-        sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage)
-    workflow_storage.service_template.put(models.create_service_template())
-    service_template = workflow_storage.service_template.get_by_name(models.SERVICE_TEMPLATE_NAME)
-    service = models.create_service(service_template)
-    workflow_storage.service.put(service)
-    workflow_storage.execution.put(models.create_execution(service))
-    yield workflow_storage
-    test_storage.release_sqlite_storage(workflow_storage)
+    assert len(node.attributes) == 3
+    assert node.attributes['key'].value == 'value'
+    assert node.attributes['new_key'].value == 'new_value'
+    assert node.attributes['key2'].value == 'changed_value'

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index d792a57..f667460 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -477,20 +477,22 @@ if __name__ == '__main__':
             'input_as_env_var': env_var
         })
 
+        node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        interface = mock.models.create_interface(
+            node.service,
+            'test',
+            'op',
+            operation_kwargs=dict(
+                function='{0}.{1}'.format(
+                    operations.__name__,
+                    operations.run_script_locally.__name__),
+                arguments=arguments)
+        )
+        node.interfaces[interface.name] = interface
+        workflow_context.model.node.update(node)
+
         @workflow
         def mock_workflow(ctx, graph):
-            node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-            interface = mock.models.create_interface(
-                node.service,
-                'test',
-                'op',
-                operation_kwargs=dict(
-                    function='{0}.{1}'.format(
-                        operations.__name__,
-                        operations.run_script_locally.__name__),
-                    arguments=arguments)
-            )
-            node.interfaces[interface.name] = interface
             graph.add_tasks(api.task.OperationTask(
                 node,
                 interface_name='test',

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 8b326e7..8c4dd2d 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -214,33 +214,33 @@ class TestWithActualSSHServer(object):
         else:
             operation = operations.run_script_with_ssh
 
+        node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        arguments = {
+            'script_path': script_path,
+            'fabric_env': _FABRIC_ENV,
+            'process': process,
+            'use_sudo': use_sudo,
+            'custom_env_var': custom_input,
+            'test_operation': '',
+        }
+        if hide_output:
+            arguments['hide_output'] = hide_output
+        if commands:
+            arguments['commands'] = commands
+        interface = mock.models.create_interface(
+            node.service,
+            'test',
+            'op',
+            operation_kwargs=dict(
+                function='{0}.{1}'.format(
+                    operations.__name__,
+                    operation.__name__),
+                arguments=arguments)
+        )
+        node.interfaces[interface.name] = interface
+
         @workflow
         def mock_workflow(ctx, graph):
-            node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-            arguments = {
-                'script_path': script_path,
-                'fabric_env': _FABRIC_ENV,
-                'process': process,
-                'use_sudo': use_sudo,
-                'custom_env_var': custom_input,
-                'test_operation': '',
-            }
-            if hide_output:
-                arguments['hide_output'] = hide_output
-            if commands:
-                arguments['commands'] = commands
-            interface = mock.models.create_interface(
-                node.service,
-                'test',
-                'op',
-                operation_kwargs=dict(
-                    function='{0}.{1}'.format(
-                        operations.__name__,
-                        operation.__name__),
-                    arguments=arguments)
-            )
-            node.interfaces[interface.name] = interface
-
             ops = []
             for test_operation in test_operations:
                 op_arguments = arguments.copy()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py
index 88818ca..8713e3c 100644
--- a/tests/orchestrator/workflows/builtin/test_execute_operation.py
+++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py
@@ -56,12 +56,9 @@ def test_execute_operation(ctx):
     )
 
     assert len(execute_tasks) == 1
-    assert execute_tasks[0].name == task.OperationTask.NAME_FORMAT.format(
-        type='node',
-        name=node.name,
-        interface=interface_name,
-        operation=operation_name
-    )
+    assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node
+    assert execute_tasks[0].operation_name == operation_name
+    assert execute_tasks[0].interface_name == interface_name
 
 
 # TODO: add more scenarios

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 6d2836c..0438544 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -55,12 +55,7 @@ class BaseTest(object):
                              tasks_graph=graph)
 
     @staticmethod
-    def _op(ctx,
-            func,
-            arguments=None,
-            max_attempts=None,
-            retry_interval=None,
-            ignore_failure=None):
+    def _create_interface(ctx, func, arguments=None):
         node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
         interface_name = 'aria.interfaces.lifecycle'
         operation_kwargs = dict(function='{name}.{func.__name__}'.format(
@@ -72,6 +67,17 @@ class BaseTest(object):
         interface = mock.models.create_interface(node.service, interface_name, operation_name,
                                                  operation_kwargs=operation_kwargs)
         node.interfaces[interface.name] = interface
+        ctx.model.node.update(node)
+
+        return node, interface_name, operation_name
+
+    @staticmethod
+    def _op(node,
+            operation_name,
+            arguments=None,
+            max_attempts=None,
+            retry_interval=None,
+            ignore_failure=None):
 
         return api.task.OperationTask(
             node,
@@ -158,9 +164,11 @@ class TestEngine(BaseTest):
         assert execution.status == models.Execution.SUCCEEDED
 
     def test_single_task_successful_execution(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(workflow_context, mock_success_task)
+
         @workflow
         def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_success_task))
+            graph.add_tasks(self._op(node, operation_name))
         self._execute(
             workflow_func=mock_workflow,
             workflow_context=workflow_context,
@@ -170,9 +178,11 @@ class TestEngine(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 1
 
     def test_single_task_failed_execution(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(workflow_context, mock_failed_task)
+
         @workflow
         def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_failed_task))
+            graph.add_tasks(self._op(node, operation_name))
         with pytest.raises(exceptions.ExecutorException):
             self._execute(
                 workflow_func=mock_workflow,
@@ -187,10 +197,13 @@ class TestEngine(BaseTest):
         assert execution.status == models.Execution.FAILED
 
     def test_two_tasks_execution_order(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_ordered_task, {'counter': 1})
+
         @workflow
         def mock_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
-            op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+            op1 = self._op(node, operation_name, arguments={'counter': 1})
+            op2 = self._op(node, operation_name, arguments={'counter': 2})
             graph.sequence(op1, op2)
         self._execute(
             workflow_func=mock_workflow,
@@ -202,11 +215,14 @@ class TestEngine(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_ordered_task, {'counter': 1})
+
         @workflow
         def sub_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
+            op1 = self._op(node, operation_name, arguments={'counter': 1})
             op2 = api.task.StubTask()
-            op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+            op3 = self._op(node, operation_name, arguments={'counter': 2})
             graph.sequence(op1, op2, op3)
 
         @workflow
@@ -225,11 +241,13 @@ class TestCancel(BaseTest):
 
     def test_cancel_started_execution(self, workflow_context, executor):
         number_of_tasks = 100
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_sleep_task, {'seconds': 0.1})
 
         @workflow
         def mock_workflow(ctx, graph):
             operations = (
-                self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1))
+                self._op(node, operation_name, arguments=dict(seconds=0.1))
                 for _ in range(number_of_tasks)
             )
             return graph.sequence(*operations)
@@ -267,9 +285,12 @@ class TestCancel(BaseTest):
 class TestRetries(BaseTest):
 
     def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 1},
                           max_attempts=2)
             graph.add_tasks(op)
@@ -283,9 +304,12 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 2},
                           max_attempts=2)
             graph.add_tasks(op)
@@ -300,9 +324,11 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 1},
                           max_attempts=3)
             graph.add_tasks(op)
@@ -316,9 +342,12 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
+
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 2},
                           max_attempts=3)
             graph.add_tasks(op)
@@ -332,9 +361,11 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 3
 
     def test_infinite_retries(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 1},
                           max_attempts=-1)
             graph.add_tasks(op)
@@ -358,9 +389,11 @@ class TestRetries(BaseTest):
                                   executor=executor)
 
     def _test_retry_interval(self, retry_interval, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           arguments={'failure_count': 1},
                           max_attempts=2,
                           retry_interval=retry_interval)
@@ -378,9 +411,11 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_ignore_failure(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_conditional_failure_task, {'failure_count': 1})
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
+            op = self._op(node, operation_name,
                           ignore_failure=True,
                           arguments={'failure_count': 100},
                           max_attempts=100)
@@ -401,10 +436,12 @@ class TestTaskRetryAndAbort(BaseTest):
 
     def test_task_retry_default_interval(self, workflow_context, executor):
         default_retry_interval = 0.1
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_task_retry, {'message': self.message})
 
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
+            op = self._op(node, operation_name,
                           arguments={'message': self.message},
                           retry_interval=default_retry_interval,
                           max_attempts=2)
@@ -425,10 +462,13 @@ class TestTaskRetryAndAbort(BaseTest):
     def test_task_retry_custom_interval(self, workflow_context, executor):
         default_retry_interval = 100
         custom_retry_interval = 0.1
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_task_retry, {'message': self.message,
+                                                'retry_interval': custom_retry_interval})
 
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
+            op = self._op(node, operation_name,
                           arguments={'message': self.message,
                                      'retry_interval': custom_retry_interval},
                           retry_interval=default_retry_interval,
@@ -449,9 +489,11 @@ class TestTaskRetryAndAbort(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
     def test_task_abort(self, workflow_context, executor):
+        node, _, operation_name = self._create_interface(
+            workflow_context, mock_task_abort, {'message': self.message})
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_abort,
+            op = self._op(node, operation_name,
                           arguments={'message': self.message},
                           retry_interval=100,
                           max_attempts=100)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 7969457..5f0b75f 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -32,19 +32,23 @@ def test_decorate_extension(context, executor):
     def get_node(ctx):
         return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
 
+    node = get_node(context)
+    interface_name = 'test_interface'
+    operation_name = 'operation'
+    interface = mock.models.create_interface(
+        context.service,
+        interface_name,
+        operation_name,
+        operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
+                              arguments=arguments)
+    )
+    node.interfaces[interface.name] = interface
+    context.model.node.update(node)
+
+
     @workflow
     def mock_workflow(ctx, graph):
         node = get_node(ctx)
-        interface_name = 'test_interface'
-        operation_name = 'operation'
-        interface = mock.models.create_interface(
-            ctx.service,
-            interface_name,
-            operation_name,
-            operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__),
-                                  arguments=arguments)
-        )
-        node.interfaces[interface.name] = interface
         task = api.task.OperationTask(
             node,
             interface_name=interface_name,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 2d80a3b..7dbcc5a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -83,20 +83,22 @@ def test_apply_tracked_changes_during_an_operation(context, executor):
 
 
 def _run_workflow(context, executor, op_func, arguments=None):
+    node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    interface_name = 'test_interface'
+    operation_name = 'operation'
+    wf_arguments = arguments or {}
+    interface = mock.models.create_interface(
+        context.service,
+        interface_name,
+        operation_name,
+        operation_kwargs=dict(function=_operation_mapping(op_func),
+                              arguments=wf_arguments)
+    )
+    node.interfaces[interface.name] = interface
+    context.model.node.update(node)
+
     @workflow
     def mock_workflow(ctx, graph):
-        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-        interface_name = 'test_interface'
-        operation_name = 'operation'
-        wf_arguments = arguments or {}
-        interface = mock.models.create_interface(
-            ctx.service,
-            interface_name,
-            operation_name,
-            operation_kwargs=dict(function=_operation_mapping(op_func),
-                                  arguments=wf_arguments)
-        )
-        node.interfaces[interface.name] = interface
         task = api.task.OperationTask(
             node,
             interface_name=interface_name,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/storage/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_collection_instrumentation.py b/tests/storage/test_collection_instrumentation.py
new file mode 100644
index 0000000..e915421
--- /dev/null
+++ b/tests/storage/test_collection_instrumentation.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling import models
+from aria.storage import collection_instrumentation
+
+
+class MockActor(object):
+    def __init__(self):
+        self.dict_ = {}
+        self.list_ = []
+
+
+class MockMAPI(object):
+
+    def __init__(self):
+        pass
+
+    def put(self, *args, **kwargs):
+        pass
+
+    def update(self, *args, **kwargs):
+        pass
+
+
+class CollectionInstrumentation(object):
+
+    @pytest.fixture
+    def actor(self):
+        return MockActor()
+
+    @pytest.fixture
+    def model(self):
+        return MockMAPI()
+
+    @pytest.fixture
+    def dict_(self, actor, model):
+        return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute)
+
+    @pytest.fixture
+    def list_(self, actor, model):
+        return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute)
+
+
+class TestDict(CollectionInstrumentation):
+
+    def test_keys(self, actor, dict_):
+        dict_.update(
+            {
+                'key1': models.Attribute.wrap('key1', 'value1'),
+                'key2': models.Attribute.wrap('key2', 'value2')
+            }
+        )
+        assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+    def test_values(self, actor, dict_):
+        dict_.update({
+            'key1': models.Attribute.wrap('key1', 'value1'),
+            'key2': models.Attribute.wrap('key1', 'value2')
+        })
+        assert (sorted(dict_.values()) ==
+                sorted(['value1', 'value2']) ==
+                sorted(v.value for v in actor.dict_.values()))
+
+    def test_items(self, dict_):
+        dict_.update({
+            'key1': models.Attribute.wrap('key1', 'value1'),
+            'key2': models.Attribute.wrap('key1', 'value2')
+        })
+        assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+    def test_iter(self, actor, dict_):
+        dict_.update({
+            'key1': models.Attribute.wrap('key1', 'value1'),
+            'key2': models.Attribute.wrap('key1', 'value2')
+        })
+        assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+    def test_bool(self, dict_):
+        assert not dict_
+        dict_.update({
+            'key1': models.Attribute.wrap('key1', 'value1'),
+            'key2': models.Attribute.wrap('key1', 'value2')
+        })
+        assert dict_
+
+    def test_set_item(self, actor, dict_):
+        dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+        assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
+        assert isinstance(actor.dict_['key1'], models.Attribute)
+
+    def test_nested(self, actor, dict_):
+        dict_['key'] = {}
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert dict_['key'] == actor.dict_['key'].value == {}
+
+        dict_['key']['inner_key'] = 'value'
+
+        assert len(dict_) == 1
+        assert 'inner_key' in dict_['key']
+        assert dict_['key']['inner_key'] == 'value'
+        assert dict_['key'].keys() == ['inner_key']
+        assert dict_['key'].values() == ['value']
+        assert dict_['key'].items() == [('inner_key', 'value')]
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+        dict_['key'].update({'updated_key': 'updated_value'})
+        assert len(dict_) == 1
+        assert 'updated_key' in dict_['key']
+        assert dict_['key']['updated_key'] == 'updated_value'
+        assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
+        assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
+        assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
+                                                       ('updated_key', 'updated_value')])
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+        dict_.update({'key': 'override_value'})
+        assert len(dict_) == 1
+        assert 'key' in dict_
+        assert dict_['key'] == 'override_value'
+        assert len(actor.dict_) == 1
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert actor.dict_['key'].value == 'override_value'
+
+    def test_get_item(self, actor, dict_):
+        dict_['key1'] = models.Attribute.wrap('key1', 'value1')
+        assert isinstance(actor.dict_['key1'], models.Attribute)
+
+    def test_update(self, actor, dict_):
+        dict_['key1'] = 'value1'
+
+        new_dict = {'key2': 'value2'}
+        dict_.update(new_dict)
+        assert len(dict_) == 2
+        assert dict_['key2'] == 'value2'
+        assert isinstance(actor.dict_['key2'], models.Attribute)
+
+        new_dict = {}
+        new_dict.update(dict_)
+        assert new_dict['key1'] == dict_['key1']
+
+    def test_copy(self, dict_):
+        dict_['key1'] = 'value1'
+
+        new_dict = dict_.copy()
+        assert new_dict is not dict_
+        assert new_dict == dict_
+
+        dict_['key1'] = 'value2'
+        assert new_dict['key1'] == 'value1'
+        assert dict_['key1'] == 'value2'
+
+    def test_clear(self, dict_):
+        dict_['key1'] = 'value1'
+        dict_.clear()
+
+        assert len(dict_) == 0
+
+
+class TestList(CollectionInstrumentation):
+
+    def test_append(self, actor, list_):
+        list_.append(models.Attribute.wrap('name', 'value1'))
+        list_.append('value2')
+        assert len(actor.list_) == 2
+        assert len(list_) == 2
+        assert isinstance(actor.list_[0], models.Attribute)
+        assert list_[0] == 'value1'
+
+        assert isinstance(actor.list_[1], models.Attribute)
+        assert list_[1] == 'value2'
+
+        list_[0] = 'new_value1'
+        list_[1] = 'new_value2'
+        assert isinstance(actor.list_[1], models.Attribute)
+        assert isinstance(actor.list_[1], models.Attribute)
+        assert list_[0] == 'new_value1'
+        assert list_[1] == 'new_value2'
+
+    def test_iter(self, list_):
+        list_.append('value1')
+        list_.append('value2')
+        assert sorted(list_) == sorted(['value1', 'value2'])
+
+    def test_insert(self, actor, list_):
+        list_.append('value1')
+        list_.insert(0, 'value2')
+        list_.insert(2, 'value3')
+        list_.insert(10, 'value4')
+        assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
+        assert len(actor.list_) == 4
+
+    def test_set(self, list_):
+        list_.append('value1')
+        list_.append('value2')
+
+        list_[1] = 'value3'
+        assert len(list_) == 2
+        assert sorted(list_) == sorted(['value1', 'value3'])
+
+    def test_insert_into_nested(self, actor, list_):
+        list_.append([])
+
+        list_[0].append('inner_item')
+        assert isinstance(actor.list_[0], models.Attribute)
+        assert len(list_) == 1
+        assert list_[0][0] == 'inner_item'
+
+        list_[0].append('new_item')
+        assert isinstance(actor.list_[0], models.Attribute)
+        assert len(list_) == 1
+        assert list_[0][1] == 'new_item'
+
+        assert list_[0] == ['inner_item', 'new_item']
+        assert ['inner_item', 'new_item'] == list_[0]
+
+
+class TestDictList(CollectionInstrumentation):
+    def test_dict_in_list(self, actor, list_):
+        list_.append({})
+        assert len(list_) == 1
+        assert isinstance(actor.list_[0], models.Attribute)
+        assert actor.list_[0].value == {}
+
+        list_[0]['key'] = 'value'
+        assert list_[0]['key'] == 'value'
+        assert len(actor.list_) == 1
+        assert isinstance(actor.list_[0], models.Attribute)
+        assert actor.list_[0].value['key'] == 'value'
+
+    def test_list_in_dict(self, actor, dict_):
+        dict_['key'] = []
+        assert len(dict_) == 1
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert actor.dict_['key'].value == []
+
+        dict_['key'].append('value')
+        assert dict_['key'][0] == 'value'
+        assert len(actor.dict_) == 1
+        assert isinstance(actor.dict_['key'], models.Attribute)
+        assert actor.dict_['key'].value[0] == 'value'