You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/04 12:11:43 UTC

incubator-ariatosca git commit: ARIA-262 Inconsistent node attributes behavior [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-262-Inconsistent-node-attributes-behavior 225971037 -> 81dbad76c (forced update)


ARIA-262 Inconsistent node attributes behavior


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/81dbad76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/81dbad76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/81dbad76

Branch: refs/heads/ARIA-262-Inconsistent-node-attributes-behavior
Commit: 81dbad76c7b86bbdaf6b4e87d8b709a0caec6ae0
Parents: 9174f94
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 31 21:07:49 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jun 4 15:11:26 2017 +0300

----------------------------------------------------------------------
 .../context/collection_instrumentation.py       | 242 ---------------
 aria/orchestrator/context/operation.py          |  14 +-
 aria/orchestrator/decorators.py                 |  14 +-
 aria/storage/api.py                             |   1 +
 aria/storage/collection_instrumentation.py      | 291 +++++++++++++++++++
 aria/storage/core.py                            |  22 ++
 aria/storage/sql_mapi.py                        |  12 +-
 aria/utils/imports.py                           |   2 +-
 aria/utils/validation.py                        |   2 +-
 .../context/test_collection_instrumentation.py  |  16 +-
 tests/orchestrator/context/test_operation.py    |   1 -
 tests/orchestrator/execution_plugin/test_ssh.py |   8 +-
 tests/orchestrator/workflows/core/test_task.py  |   2 +-
 .../executor/test_process_executor_extension.py |   5 +-
 14 files changed, 359 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/orchestrator/context/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/collection_instrumentation.py b/aria/orchestrator/context/collection_instrumentation.py
deleted file mode 100644
index 91cfd35..0000000
--- a/aria/orchestrator/context/collection_instrumentation.py
+++ /dev/null
@@ -1,242 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from functools import partial
-
-from aria.modeling import models
-
-
-class _InstrumentedCollection(object):
-
-    def __init__(self,
-                 model,
-                 parent,
-                 field_name,
-                 seq=None,
-                 is_top_level=True,
-                 **kwargs):
-        self._model = model
-        self._parent = parent
-        self._field_name = field_name
-        self._is_top_level = is_top_level
-        self._load(seq, **kwargs)
-
-    @property
-    def _raw(self):
-        raise NotImplementedError
-
-    def _load(self, seq, **kwargs):
-        """
-        Instantiates the object from existing seq.
-
-        :param seq: the original sequence to load from
-        :return:
-        """
-        raise NotImplementedError
-
-    def _set(self, key, value):
-        """
-        set the changes for the current object (not in the db)
-
-        :param key:
-        :param value:
-        :return:
-        """
-        raise NotImplementedError
-
-    def _del(self, collection, key):
-        raise NotImplementedError
-
-    def _instrument(self, key, value):
-        """
-        Instruments any collection to track changes (and ease of access)
-        :param key:
-        :param value:
-        :return:
-        """
-        if isinstance(value, _InstrumentedCollection):
-            return value
-        elif isinstance(value, dict):
-            instrumentation_cls = _InstrumentedDict
-        elif isinstance(value, list):
-            instrumentation_cls = _InstrumentedList
-        else:
-            return value
-
-        return instrumentation_cls(self._model, self, key, value, False)
-
-    @staticmethod
-    def _raw_value(value):
-        """
-        Get the raw value.
-        :param value:
-        :return:
-        """
-        if isinstance(value, models.Parameter):
-            return value.value
-        return value
-
-    @staticmethod
-    def _encapsulate_value(key, value):
-        """
-        Create a new item cls if needed.
-        :param key:
-        :param value:
-        :return:
-        """
-        if isinstance(value, models.Parameter):
-            return value
-        # If it is not wrapped
-        return models.Parameter.wrap(key, value)
-
-    def __setitem__(self, key, value):
-        """
-        Update the values in both the local and the db locations.
-        :param key:
-        :param value:
-        :return:
-        """
-        self._set(key, value)
-        if self._is_top_level:
-            # We are at the top level
-            field = getattr(self._parent, self._field_name)
-            mapi = getattr(self._model, models.Parameter.__modelname__)
-            value = self._set_field(field,
-                                    key,
-                                    value if key in field else self._encapsulate_value(key, value))
-            mapi.update(value)
-        else:
-            # We are not at the top level
-            self._set_field(self._parent, self._field_name, self)
-
-    def _set_field(self, collection, key, value):
-        """
-        enables updating the current change in the ancestors
-        :param collection: the collection to change
-        :param key: the key for the specific field
-        :param value: the new value
-        :return:
-        """
-        if isinstance(value, _InstrumentedCollection):
-            value = value._raw
-        if key in collection and isinstance(collection[key], models.Parameter):
-            if isinstance(collection[key], _InstrumentedCollection):
-                self._del(collection, key)
-            collection[key].value = value
-        else:
-            collection[key] = value
-        return collection[key]
-
-    def __deepcopy__(self, *args, **kwargs):
-        return self._raw
-
-
-class _InstrumentedDict(_InstrumentedCollection, dict):
-
-    def _load(self, dict_=None, **kwargs):
-        dict.__init__(
-            self,
-            tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()),
-            **kwargs)
-
-    def update(self, dict_=None, **kwargs):
-        dict_ = dict_ or {}
-        for key, value in dict_.items():
-            self[key] = value
-        for key, value in kwargs.items():
-            self[key] = value
-
-    def __getitem__(self, key):
-        return self._instrument(key, dict.__getitem__(self, key))
-
-    def _set(self, key, value):
-        dict.__setitem__(self, key, self._raw_value(value))
-
-    @property
-    def _raw(self):
-        return dict(self)
-
-    def _del(self, collection, key):
-        del collection[key]
-
-
-class _InstrumentedList(_InstrumentedCollection, list):
-
-    def _load(self, list_=None, **kwargs):
-        list.__init__(self, list(item for item in list_ or []))
-
-    def append(self, value):
-        self.insert(len(self), value)
-
-    def insert(self, index, value):
-        list.insert(self, index, self._raw_value(value))
-        if self._is_top_level:
-            field = getattr(self._parent, self._field_name)
-            field.insert(index, self._encapsulate_value(index, value))
-        else:
-            self._parent[self._field_name] = self
-
-    def __getitem__(self, key):
-        return self._instrument(key, list.__getitem__(self, key))
-
-    def _set(self, key, value):
-        list.__setitem__(self, key, value)
-
-    def _del(self, collection, key):
-        del collection[key]
-
-    @property
-    def _raw(self):
-        return list(self)
-
-
-class _InstrumentedModel(object):
-
-    def __init__(self, field_name, original_model, model_storage):
-        super(_InstrumentedModel, self).__init__()
-        self._field_name = field_name
-        self._model_storage = model_storage
-        self._original_model = original_model
-        self._apply_instrumentation()
-
-    def __getattr__(self, item):
-        return getattr(self._original_model, item)
-
-    def _apply_instrumentation(self):
-
-        field = getattr(self._original_model, self._field_name)
-
-        # Preserve the original value. e.g. original attributes would be located under
-        # _attributes
-        setattr(self, '_{0}'.format(self._field_name), field)
-
-        # set instrumented value
-        setattr(self, self._field_name, _InstrumentedDict(self._model_storage,
-                                                          self._original_model,
-                                                          self._field_name,
-                                                          field))
-
-
-def instrument_collection(field_name, func=None):
-    if func is None:
-        return partial(instrument_collection, field_name)
-
-    def _wrapper(*args, **kwargs):
-        original_model = func(*args, **kwargs)
-        return type('Instrumented{0}'.format(original_model.__class__.__name__),
-                    (_InstrumentedModel, ),
-                    {})(field_name, original_model, args[0].model)
-
-    return _wrapper

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index f0ba337..efdc04d 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -21,10 +21,7 @@ import threading
 
 import aria
 from aria.utils import file
-from . import (
-    common,
-    collection_instrumentation
-)
+from . import common
 
 
 class BaseOperationContext(common.BaseContext):
@@ -76,7 +73,6 @@ class BaseOperationContext(common.BaseContext):
 
     @property
     def serialization_dict(self):
-        context_cls = self.__class__
         context_dict = {
             'name': self.name,
             'service_id': self._service_id,
@@ -89,7 +85,7 @@ class BaseOperationContext(common.BaseContext):
             'logger_level': self.logger.level
         }
         return {
-            'context_cls': context_cls,
+            'context_cls': self.__class__,
             'context': context_dict
         }
 
@@ -117,7 +113,6 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def node_template(self):
         """
         the node of the current operation
@@ -126,7 +121,6 @@ class NodeOperationContext(BaseOperationContext):
         return self.node.node_template
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def node(self):
         """
         The node instance of the current operation
@@ -141,7 +135,6 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def source_node_template(self):
         """
         The source node
@@ -150,7 +143,6 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.source_node.node_template
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def source_node(self):
         """
         The source node instance
@@ -159,7 +151,6 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.relationship.source_node
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def target_node_template(self):
         """
         The target node
@@ -168,7 +159,6 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.target_node.node_template
 
     @property
-    @collection_instrumentation.instrument_collection('attributes')
     def target_node(self):
         """
         The target node instance

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index 4051a54..5209abb 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -19,6 +19,7 @@ Workflow and operation decorators
 
 from functools import partial, wraps
 
+from aria.modeling import models
 from ..utils.validation import validate_function_arguments
 from ..utils.uuid import generate_uuid
 
@@ -48,7 +49,7 @@ def workflow(func=None, suffix_template=''):
 
         workflow_parameters.setdefault('ctx', ctx)
         workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
-        validate_function_arguments(func, workflow_parameters)
+        validate_function_arguments(func, **workflow_parameters)
         with context.workflow.current.push(ctx):
             func(**workflow_parameters)
         return workflow_parameters['graph']
@@ -68,11 +69,16 @@ def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=No
 
     @wraps(func)
     def _wrapper(**func_kwargs):
+        ctx = func_kwargs.pop('ctx')
         if toolbelt:
-            operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
+            operation_toolbelt = context.toolbelt(ctx)
             func_kwargs.setdefault('toolbelt', operation_toolbelt)
-        validate_function_arguments(func, func_kwargs)
-        return func(**func_kwargs)
+        validate_function_arguments(func, ctx=ctx, **func_kwargs)
+        with ctx.model.instrument({
+            models.Node: ['attributes', 'properties'],
+            models.NodeTemplate: ['attributes', 'properties']
+        }):
+            return func(ctx=ctx, **func_kwargs)
     return _wrapper
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/storage/api.py
----------------------------------------------------------------------
diff --git a/aria/storage/api.py b/aria/storage/api.py
index ed8a2ff..11c7e18 100644
--- a/aria/storage/api.py
+++ b/aria/storage/api.py
@@ -45,6 +45,7 @@ class ModelAPI(StorageAPI):
         super(ModelAPI, self).__init__(**kwargs)
         self._model_cls = model_cls
         self._name = name or model_cls.__modelname__
+        self._instrumentation = {}
 
     @property
     def name(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/storage/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py
new file mode 100644
index 0000000..cb6c81a
--- /dev/null
+++ b/aria/storage/collection_instrumentation.py
@@ -0,0 +1,291 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ..modeling import models
+from . import exceptions
+
+
+class _InstrumentedCollection(object):
+
+    def __init__(self,
+                 mapi,
+                 parent,
+                 field_name,
+                 seq=None,
+                 is_top_level=True,
+                 **kwargs):
+        self._mapi = mapi
+        self._parent = parent
+        self._field_name = field_name
+        self._is_top_level = is_top_level
+        self._load(seq, **kwargs)
+
+    @property
+    def _raw(self):
+        raise NotImplementedError
+
+    def _load(self, seq, **kwargs):
+        """
+        Instantiates the object from existing seq.
+
+        :param seq: the original sequence to load from
+        :return:
+        """
+        raise NotImplementedError
+
+    def _set(self, key, value):
+        """
+        set the changes for the current object (not in the db)
+
+        :param key:
+        :param value:
+        :return:
+        """
+        raise NotImplementedError
+
+    def _del(self, collection, key):
+        raise NotImplementedError
+
+    def _instrument(self, key, value):
+        """
+        Instruments any collection to track changes (and ease of access)
+        :param key:
+        :param value:
+        :return:
+        """
+        if isinstance(value, _InstrumentedCollection):
+            return value
+        elif isinstance(value, dict):
+            instrumentation_cls = _InstrumentedDict
+        elif isinstance(value, list):
+            instrumentation_cls = _InstrumentedList
+        else:
+            return value
+
+        return instrumentation_cls(self._mapi, self, key, value, False)
+
+    @staticmethod
+    def _raw_value(value):
+        """
+        Get the raw value.
+        :param value:
+        :return:
+        """
+        if isinstance(value, models.Parameter):
+            return value.value
+        return value
+
+    @staticmethod
+    def _encapsulate_value(key, value):
+        """
+        Create a new item cls if needed.
+        :param key:
+        :param value:
+        :return:
+        """
+        if isinstance(value, models.Parameter):
+            return value
+        # If it is not wrapped
+        return models.Parameter.wrap(key, value)
+
+    def __setitem__(self, key, value):
+        """
+        Update the values in both the local and the db locations.
+        :param key:
+        :param value:
+        :return:
+        """
+        self._set(key, value)
+        if self._is_top_level:
+            # We are at the top level
+            field = getattr(self._parent, self._field_name)
+            self._set_field(
+                field, key, value if key in field else self._encapsulate_value(key, value))
+            self._mapi.update(self._parent)
+        else:
+            # We are not at the top level
+            self._set_field(self._parent, self._field_name, self)
+
+    def _set_field(self, collection, key, value):
+        """
+        enables updating the current change in the ancestors
+        :param collection: the collection to change
+        :param key: the key for the specific field
+        :param value: the new value
+        :return:
+        """
+        if isinstance(value, _InstrumentedCollection):
+            value = value._raw
+        if key in collection and isinstance(collection[key], models.Parameter):
+            if isinstance(collection[key], _InstrumentedCollection):
+                self._del(collection, key)
+            collection[key].value = value
+        else:
+            collection[key] = value
+        return collection[key]
+
+    def __deepcopy__(self, *args, **kwargs):
+        return self._raw
+
+
+class _InstrumentedDict(_InstrumentedCollection, dict):
+
+    def _load(self, dict_=None, **kwargs):
+        dict.__init__(
+            self,
+            tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()),
+            **kwargs)
+
+    def update(self, dict_=None, **kwargs):
+        dict_ = dict_ or {}
+        for key, value in dict_.items():
+            self[key] = value
+        for key, value in kwargs.items():
+            self[key] = value
+
+    def __getitem__(self, key):
+        return self._instrument(key, dict.__getitem__(self, key))
+
+    def _set(self, key, value):
+        dict.__setitem__(self, key, self._raw_value(value))
+
+    @property
+    def _raw(self):
+        return dict(self)
+
+    def _del(self, collection, key):
+        del collection[key]
+
+
+class _InstrumentedList(_InstrumentedCollection, list):
+
+    def _load(self, list_=None, **kwargs):
+        list.__init__(self, list(item for item in list_ or []))
+
+    def append(self, value):
+        self.insert(len(self), value)
+
+    def insert(self, index, value):
+        list.insert(self, index, self._raw_value(value))
+        if self._is_top_level:
+            field = getattr(self._parent, self._field_name)
+            field.insert(index, self._encapsulate_value(index, value))
+        else:
+            self._parent[self._field_name] = self
+
+    def __getitem__(self, key):
+        return self._instrument(key, list.__getitem__(self, key))
+
+    def _set(self, key, value):
+        list.__setitem__(self, key, value)
+
+    def _del(self, collection, key):
+        del collection[key]
+
+    @property
+    def _raw(self):
+        return list(self)
+
+
+class _InstrumentedModel(object):
+
+    def __init__(self, original_model, mapi, instrumentation):
+        """
+        The original model
+        :param original_model: the model to be instrumented
+        :param mapi: the mapi for that model
+        """
+        super(_InstrumentedModel, self).__init__()
+        self._original_model = original_model
+        self._mapi = mapi
+        self._instrumentation = instrumentation
+        self._apply_instrumentation()
+
+    def __getattr__(self, item):
+        return_value = getattr(self._original_model, item)
+        if isinstance(return_value, (list, dict)):
+            return _WrappedModel(return_value, self._instrumentation, mapi=self._mapi)
+        return return_value
+
+    def _apply_instrumentation(self):
+        for field_name in self._instrumentation[self._original_model.__class__]:
+            field = getattr(self._original_model, field_name)
+
+            # Preserve the original value. e.g. original attributes would be located under
+            # _attributes
+            setattr(self, '_{0}'.format(field_name), field)
+
+            # set instrumented value
+            if isinstance(field, dict):
+                instrumentation_cls = _InstrumentedDict
+            elif isinstance(field, list):
+                instrumentation_cls = _InstrumentedList
+            else:
+                # TODO: raise proper error
+                raise exceptions.StorageError(
+                    "ARIA supports instrumentation for dict and list. Field {field} of the "
+                    "class {model} is of {type} type.".format(
+                        field=field,
+                        model=self._original_model,
+                        type=type(field)))
+
+            instrumented_class = instrumentation_cls(seq=field,
+                                                     parent=self._original_model,
+                                                     mapi=self._mapi,
+                                                     field_name=field_name)
+            setattr(self, field_name, instrumented_class)
+
+
+class _WrappedModel(object):
+
+    def __init__(self, wrapped, instrumentation, **kwargs):
+        """
+
+        :param instrumented_cls: The class to be instrumented
+        :param instrumentation_cls: the instrumentation cls
+        :param wrapped: the currently wrapped instance
+        :param kwargs: and kwargs to te passed to the instrumented class.
+        """
+        self._kwargs = kwargs
+        self._instrumentation = instrumentation
+        self._wrapped = wrapped
+
+    def _wrap(self, value):
+        if value.__class__ in self._instrumentation:
+            return _InstrumentedModel(value, instrumentation=self._instrumentation, **self._kwargs)
+        elif isinstance(value, models.aria_declarative_base):
+            return _WrappedModel(value, self._instrumentation, **self._kwargs)
+        return value
+
+    def __getattr__(self, item):
+        if hasattr(self, '_wrapped'):
+            return self._wrap(getattr(self._wrapped, item))
+        else:
+            super(_WrappedModel, self).__getattribute__(item)
+
+    def __getitem__(self, item):
+        return self._wrap(self._wrapped[item])
+
+
+def instrument(instrumentation, original_model, mapi):
+    for instrumented_cls in instrumentation:
+        if isinstance(original_model, instrumented_cls):
+            return type('Instrumented{0}'.format(original_model.__class__.__name__),
+                        (_InstrumentedModel, ),
+                        {})(original_model, mapi, instrumentation)
+
+    return type('Wrapped{0}'.format(original_model.__class__.__name__),
+                (_WrappedModel, ),
+                {})(original_model, instrumentation, mapi=mapi)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 8302fc9..b752505 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -37,6 +37,8 @@ API:
     * drivers - module, a pool of ARIA standard drivers.
     * StorageDriver - class, abstract model implementation.
 """
+import copy
+from contextlib import contextmanager
 
 from aria.logger import LoggerMixin
 from . import sql_mapi
@@ -165,3 +167,23 @@ class ModelStorage(Storage):
         """
         for mapi in self.registered.values():
             mapi.drop()
+
+    @contextmanager
+    def instrument(self, instrumentation):
+
+        def _instrument(source_inst, target_inst=None):
+            for mapi in self.registered.values():
+                for cls, fields in source_inst.items():
+                    if target_inst is not None:
+                        target_inst[cls] = copy.copy(mapi._instrumentation.get(cls, []))
+                    if fields:
+                        mapi._instrumentation[cls] = fields
+                    else:
+                        del mapi._instrumentation[cls]
+
+        original_instrumentation = {}
+        try:
+            _instrument(instrumentation, original_instrumentation)
+            yield self
+        finally:
+            _instrument(original_instrumentation)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 730d007..b748234 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -29,6 +29,7 @@ from aria.utils.collections import OrderedDict
 from . import (
     api,
     exceptions,
+    collection_instrumentation
 )
 
 _predicates = {'ge': '__ge__',
@@ -63,7 +64,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
                 'Requested `{0}` with ID `{1}` was not found'
                 .format(self.model_cls.__name__, entry_id)
             )
-        return result
+        return self._instrument(result)
 
     def get_by_name(self, entry_name, include=None, **kwargs):
         assert hasattr(self.model_cls, 'name')
@@ -93,7 +94,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
 
         return ListResult(
             dict(total=total, size=size, offset=offset),
-            results
+            [self._instrument(result) for result in results]
         )
 
     def iter(self,
@@ -103,6 +104,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
              **kwargs):
         """Return a (possibly empty) list of `model_class` results
         """
+        # TODO: support suggaring in iter to
         return iter(self._get_query(include, filters, sort))
 
     def put(self, entry, **kwargs):
@@ -378,6 +380,12 @@ class SQLAlchemyModelAPI(api.ModelAPI):
         for rel in instance.__mapper__.relationships:
             getattr(instance, rel.key)
 
+    def _instrument(self, model):
+        if self._instrumentation:
+            return collection_instrumentation.instrument(self._instrumentation, model, self)
+        else:
+            return model
+
 
 def init_storage(base_dir, filename='db.sqlite'):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/utils/imports.py
----------------------------------------------------------------------
diff --git a/aria/utils/imports.py b/aria/utils/imports.py
index 64a48cf..35aa0fc 100644
--- a/aria/utils/imports.py
+++ b/aria/utils/imports.py
@@ -17,8 +17,8 @@
 Utility methods for dynamically loading python code
 """
 
-import importlib
 import pkgutil
+import importlib
 
 
 def import_fullname(name, paths=None):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/aria/utils/validation.py
----------------------------------------------------------------------
diff --git a/aria/utils/validation.py b/aria/utils/validation.py
index 193cb33..bcdeb7a 100644
--- a/aria/utils/validation.py
+++ b/aria/utils/validation.py
@@ -65,7 +65,7 @@ class ValidatorMixin(object):
                 name=argument_name, type='callable', arg=argument))
 
 
-def validate_function_arguments(func, func_kwargs):
+def validate_function_arguments(func, **func_kwargs):
     """
     Validates all required arguments are supplied to ``func`` and that no additional arguments are
     supplied

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/tests/orchestrator/context/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py
index 3ee5a44..066cc33 100644
--- a/tests/orchestrator/context/test_collection_instrumentation.py
+++ b/tests/orchestrator/context/test_collection_instrumentation.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.modeling.models import Parameter
-from aria.orchestrator.context import collection_instrumentation
+from aria.storage import collection_instrumentation
 
 
 class MockActor(object):
@@ -25,12 +25,16 @@ class MockActor(object):
         self.list_ = []
 
 
-class MockModel(object):
+class MockMAPI(object):
 
     def __init__(self):
-        self.parameter = type('MockModel', (object, ), {'model_cls': Parameter,
-                                                        'put': lambda *args, **kwargs: None,
-                                                        'update': lambda *args, **kwargs: None})()
+        pass
+
+    def put(self, *args, **kwargs):
+        pass
+
+    def update(self, *args, **kwargs):
+        pass
 
 
 class CollectionInstrumentation(object):
@@ -41,7 +45,7 @@ class CollectionInstrumentation(object):
 
     @pytest.fixture
     def model(self):
-        return MockModel()
+        return MockMAPI()
 
     @pytest.fixture
     def dict_(self, actor, model):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 3dcfaa2..59df059 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -503,6 +503,5 @@ def attribute_consuming_operation(ctx, holder_path, **_):
     holder = helpers.FilesystemDataHolder(holder_path)
     ctx.target_node.attributes.update(ctx.source_node.attributes)
     holder.update(**ctx.target_node.attributes)
-
     ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2']
     holder['key2'] = ctx.target_node.attributes['key2']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 899a007..2f0b39c 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -427,10 +427,17 @@ class TestFabricEnvHideGroupsAndRunCommands(object):
             def abort(message=None):
                 models.Task.abort(message)
             actor = None
+
         class Actor(object):
             host = None
+
+        class Model(object):
+            @contextlib.contextmanager
+            def instrument(self, *args, **kwargs):
+                yield
         task = Task
         task.actor = Actor
+        model = Model()
         logger = logging.getLogger()
 
     @staticmethod
@@ -439,7 +446,6 @@ class TestFabricEnvHideGroupsAndRunCommands(object):
         yield
     _Ctx.logging_handlers = _mock_self_logging
 
-
     @pytest.fixture(autouse=True)
     def _setup(self, mocker):
         self.default_fabric_env = {

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index a717e19..c0d3616 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -100,7 +100,7 @@ class TestOperationTask(object):
         storage_task = ctx.model.task.get_by_name(core_task.name)
         assert storage_task.plugin is storage_plugin
         assert storage_task.execution_name == ctx.execution.name
-        assert storage_task.actor == core_task.context.node._original_model
+        assert storage_task.actor == core_task.context.node
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.function == api_task.function

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/81dbad76/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index e4944df..f3e40b6 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -66,8 +66,9 @@ class MockProcessExecutorExtension(object):
     def decorate(self):
         def decorator(function):
             def wrapper(ctx, **operation_arguments):
-                ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments}
-                function(ctx=ctx, **operation_arguments)
+                with ctx.model.instrument({ctx.model.node.model_cls: ['attributes']}):
+                    ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments}
+                    function(ctx=ctx, **operation_arguments)
             return wrapper
         return decorator