You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/16 16:54:26 UTC

incubator-ariatosca git commit: made the dict like decorator more robust, added tests. removed the instrumentation entirely. added some session closing and engine disposing

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/runtime_props_to_attr 1c7347e6d -> 8e60d3f44


made the dict like decorator more robust, added tests. removed the instrumentation entirely. added some session closing and engine disposing


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8e60d3f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8e60d3f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8e60d3f4

Branch: refs/heads/runtime_props_to_attr
Commit: 8e60d3f447d3739f9becf0092d9ba4e240271bf9
Parents: 1c7347e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue May 16 19:54:21 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue May 16 19:54:21 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/common.py             | 151 +++++++++++++++++++
 aria/orchestrator/context/operation.py          |  78 +---------
 .../execution_plugin/ctx_proxy/server.py        |   6 +-
 aria/orchestrator/workflows/executor/process.py | 125 +++------------
 aria/storage/instrumentation.py                 |  10 +-
 tests/orchestrator/context/test_operation.py    | 126 +++++++++++++++-
 .../orchestrator/execution_plugin/test_local.py |  66 ++++----
 tests/orchestrator/workflows/core/test_task.py  |   2 +-
 ...process_executor_concurrent_modifications.py |  12 +-
 .../executor/test_process_executor_extension.py |   6 +-
 .../test_process_executor_tracked_changes.py    |  51 +++----
 11 files changed, 382 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 0854a27..260ccea 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -18,6 +18,7 @@ A common context for both workflow and operation
 """
 
 import logging
+import collections
 from contextlib import contextmanager
 from functools import partial
 
@@ -195,3 +196,153 @@ class BaseContext(object):
         variables.setdefault('ctx', self)
         resource_template = jinja2.Template(resource_content)
         return resource_template.render(variables)
+
+
+class _Dict(collections.MutableMapping):
+    def __init__(self, actor, model, nested=None):
+        super(_Dict, self).__init__()
+        self._actor = actor
+        self._attributes = self._actor.attributes
+        self._model = model
+        self._attr_cls = self._model.parameter.model_cls
+        self._nested = nested or []
+
+    def __delitem__(self, key):
+        del self._nested_value[key]
+
+    def __contains__(self, item):
+        for key in self.keys():
+            if item == key:
+                return True
+        return False
+
+    def __len__(self):
+        return len(self._nested_value)
+
+    def __nonzero__(self):
+        return bool(self._nested_value)
+
+    def __getitem__(self, item):
+        if self._nested:
+            value = self._nested_value[item]
+        else:
+            value = self._attributes[item].value
+        if isinstance(value, dict):
+            return _Dict(self._actor, self._model, nested=self._nested + [item])
+        elif isinstance(value, self._attr_cls):
+            return value.value
+        return value
+
+    def __setitem__(self, key, value):
+        if self._nested or key in self._attributes:
+            attribute = self._update_attr(key, value)
+            self._model.parameter.update(attribute)
+        else:
+            attr = self._attr_cls.wrap(key, value)
+            self._attributes[key] = attr
+            self._model.parameter.put(attr)
+
+    @property
+    def _nested_value(self):
+        current = self._attributes
+        for k in self._nested:
+            current = current[k]
+        return current.value if isinstance(current, self._attr_cls) else current
+
+    def _update_attr(self, key, value):
+        current = self._attributes
+
+        # If this is nested, lets extract the Parameter itself
+        if self._nested:
+            attribute = current = current[self._nested[0]]
+            for k in self._nested[1:]:
+                current = current[k]
+            if isinstance(current, self._attr_cls):
+                current.value[key] = value
+            else:
+                current[key] = value
+        elif isinstance(current[key], self._attr_cls):
+            attribute = current[key]
+            attribute.value = value
+        else:
+            raise BaseException()
+
+        # Since this a user defined parameter, this doesn't track changes. So we override the entire
+        # thing.
+        if isinstance(attribute.value, dict):
+            value = attribute.value.copy()
+            attribute.value.clear()
+        attribute.value = value
+        return attribute
+
+    def _unwrap(self, attr):
+        return attr.unwrap() if isinstance(attr, self._attr_cls) else attr
+
+    def keys(self):
+        dict_ = (self._nested_value.value
+                 if isinstance(self._nested_value, self._attr_cls)
+                 else self._nested_value)
+        for key in dict_.keys():
+            yield key
+
+    def values(self):
+        for val in self._nested_value.values():
+            if isinstance(val, self._attr_cls):
+                yield val.value
+            else:
+                yield val
+
+    def items(self):
+        for key in self._nested_value:
+            val = self._nested_value[key]
+            if isinstance(val, self._attr_cls):
+                yield key, val.value
+            else:
+                yield key, val
+
+    def __dict__(self):
+        return dict(item for item in self.items())
+
+    def __iter__(self):
+        for key in self._nested_value.keys():
+            yield key
+
+    def __copy__(self):
+        return dict((k, v) for k, v in self.items())
+
+    def __deepcopy__(self, *args, **kwargs):
+        return self.__copy__()
+
+    def copy(self):
+        return self.__copy__()
+
+    def clear(self):
+        self._nested_value.clear()
+
+    def update(self, dict_=None, **kwargs):
+        if dict_:
+            for key, value in dict_.items():
+                self[key] = value
+
+        for key, value in kwargs.items():
+            self[key] = value
+
+
+class DecorateAttributes(dict):
+
+    def __init__(self, func):
+        super(DecorateAttributes, self).__init__()
+        self._func = func
+
+    def __getattr__(self, item):
+        try:
+            return getattr(self._actor, item)
+        except AttributeError:
+            return super(DecorateAttributes, self).__getattribute__(item)
+
+    def __call__(self, *args, **kwargs):
+        func_self = args[0]
+        self._actor = self._func(*args, **kwargs)
+        self._model = func_self.model
+        self.attributes = _Dict(self._actor, self._model)
+        return self

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 2182db5..f4e8813 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -21,72 +21,10 @@ import threading
 
 import aria
 from aria.utils import file
-from .common import BaseContext
+from . import common
 
 
-class _DecorateAttributes(object):
-
-    class _Attributes(object):
-        def __init__(self, model, actor):
-            self._model = model
-            self._actor = actor
-            self._attributes = actor.attributes
-            self._attr_cls = self._model.parameter.model_cls
-
-        def __getitem__(self, item):
-            return self._attributes[item].value
-
-        def __setitem__(self, key, value):
-            if key in self._attributes:
-                self._attributes[key].value = value
-                self._model.parameter.update(self._attributes[key])
-            else:
-                attr = self._attr_cls.wrap(key, value)
-                self._attributes[key] = attr
-                self._model.parameter.put(attr)
-
-        def update(self, dict_=None, **kwargs):
-            if dict_:
-                for key, value in dict_.items():
-                    self[key] = value
-
-            for key, value in kwargs.items():
-                self[key] = value
-
-        def keys(self):
-            for attr in self._attributes.values():
-                yield attr.unwrap()[0]
-
-        def values(self):
-            for attr in self._attributes.values():
-                yield attr.unwrap()[1]
-
-        def items(self):
-            for attr in self._attributes.values():
-                yield attr.unwrap()
-
-        def __iter__(self):
-            for attr in self._attributes.values():
-                yield attr.unwrap()[0]
-
-    def __init__(self, func):
-        self._func = func
-
-    def __getattr__(self, item):
-        try:
-            return getattr(self._actor, item)
-        except AttributeError:
-            return super(_DecorateAttributes, self).__getattribute__(item)
-
-    def __call__(self, *args, **kwargs):
-        func_self = args[0]
-        self._actor = self._func(*args, **kwargs)
-        self._model = func_self.model
-        self.attributes = self._Attributes(self._model, self._actor)
-        return self
-
-
-class BaseOperationContext(BaseContext):
+class BaseOperationContext(common.BaseContext):
     """
     Context object used during operation creation and execution
     """
@@ -167,7 +105,7 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def node_template(self):
         """
         the node of the current operation
@@ -176,7 +114,7 @@ class NodeOperationContext(BaseOperationContext):
         return self.node.node_template
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def node(self):
         """
         The node instance of the current operation
@@ -191,7 +129,7 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def source_node_template(self):
         """
         The source node
@@ -200,7 +138,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.source_node.node_template
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def source_node(self):
         """
         The source node instance
@@ -209,7 +147,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.relationship.source_node
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def target_node_template(self):
         """
         The target node
@@ -218,7 +156,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.target_node.node_template
 
     @property
-    @_DecorateAttributes
+    @common.DecorateAttributes
     def target_node(self):
         """
         The target node instance

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 52a5312..4173c76 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -104,7 +104,11 @@ class CtxProxy(object):
         try:
             typed_request = json.loads(request)
             args = typed_request['args']
-            payload = _process_ctx_request(self.ctx, args)
+            try:
+                payload = _process_ctx_request(self.ctx, args)
+            except BaseException:
+                self.ctx.model.log._session.close()
+                raise
             result_type = 'result'
             if isinstance(payload, exceptions.ScriptException):
                 payload = dict(message=str(payload))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index c3962ed..f3a08ec 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -43,7 +43,6 @@ import jsonpickle
 
 import aria
 from aria.orchestrator.workflows.executor import base
-from aria.storage import instrumentation
 from aria.extension import process_executor
 from aria.utils import (
     imports,
@@ -82,7 +81,6 @@ class ProcessExecutor(base.BaseExecutor):
             'started': self._handle_task_started_request,
             'succeeded': self._handle_task_succeeded_request,
             'failed': self._handle_task_failed_request,
-            'apply_tracked_changes': self._handle_apply_tracked_changes_request
         }
 
         # Server socket used to accept task status messages from subprocesses
@@ -196,41 +194,13 @@ class ProcessExecutor(base.BaseExecutor):
     def _handle_task_started_request(self, task_id, **kwargs):
         self._task_started(self._tasks[task_id])
 
-    def _handle_task_succeeded_request(self, task_id, request, **kwargs):
+    def _handle_task_succeeded_request(self, task_id, **kwargs):
         task = self._remove_task(task_id)
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            e.message += UPDATE_TRACKED_CHANGES_FAILED_STR
-            self._task_failed(task, exception=e)
-        else:
-            self._task_succeeded(task)
+        self._task_succeeded(task)
 
     def _handle_task_failed_request(self, task_id, request, **kwargs):
         task = self._remove_task(task_id)
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            e.message += 'Task failed due to {0}.'.format(request['exception']) + \
-                         UPDATE_TRACKED_CHANGES_FAILED_STR
-            self._task_failed(
-                task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info()))
-        else:
-            self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
-
-    def _handle_apply_tracked_changes_request(self, task_id, request, response):
-        task = self._tasks[task_id]
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            response['exception'] = exceptions.wrap_if_needed(e)
-
-    @staticmethod
-    def _apply_tracked_changes(task, request):
-        instrumentation.apply_tracked_changes(
-            tracked_changes=request['tracked_changes'],
-            new_instances=request['new_instances'],
-            model=task.context.model)
+        self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
 
 
 def _send_message(connection, message):
@@ -278,28 +248,19 @@ class _Messenger(object):
         """Task started message"""
         self._send_message(type='started')
 
-    def succeeded(self, tracked_changes, new_instances):
+    def succeeded(self):
         """Task succeeded message"""
-        self._send_message(
-            type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
+        self._send_message(type='succeeded')
 
-    def failed(self, tracked_changes, new_instances, exception):
+    def failed(self, exception):
         """Task failed message"""
-        self._send_message(type='failed',
-                           tracked_changes=tracked_changes,
-                           new_instances=new_instances,
-                           exception=exception)
-
-    def apply_tracked_changes(self, tracked_changes, new_instances):
-        self._send_message(type='apply_tracked_changes',
-                           tracked_changes=tracked_changes,
-                           new_instances=new_instances)
+        self._send_message(type='failed', exception=exception)
 
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
 
-    def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
+    def _send_message(self, type, exception=None):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
@@ -308,8 +269,6 @@ class _Messenger(object):
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
                 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
-                'tracked_changes': tracked_changes or {},
-                'new_instances': new_instances or {}
             })
             response = _recv_message(sock)
             response_exception = response.get('exception')
@@ -319,40 +278,6 @@ class _Messenger(object):
             sock.close()
 
 
-def _patch_ctx(ctx, messenger, instrument):
-    # model will be None only in tests that test the executor component directly
-    if not ctx.model:
-        return
-
-    # We arbitrarily select the ``node`` mapi to extract the session from it.
-    # could have been any other mapi just as well
-    session = ctx.model.node._session
-    original_refresh = session.refresh
-
-    def patched_refresh(target):
-        instrument.clear(target)
-        original_refresh(target)
-
-    def patched_commit():
-        messenger.apply_tracked_changes(instrument.tracked_changes,
-                                        instrument.new_instances_as_dict)
-        instrument.expunge_session()
-        instrument.clear()
-
-    def patched_rollback():
-        # Rollback is performed on parent process when commit fails
-        instrument.expunge_session()
-
-    # when autoflush is set to true (the default), refreshing an object will trigger
-    # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
-    # far on the session. this is not the desired behavior in the subprocess
-    session.autoflush = False
-
-    session.commit = patched_commit
-    session.rollback = patched_rollback
-    session.refresh = patched_refresh
-
-
 def _main():
     arguments_json_path = sys.argv[1]
     with open(arguments_json_path) as f:
@@ -370,32 +295,24 @@ def _main():
     operation_inputs = arguments['operation_inputs']
     context_dict = arguments['context']
 
-    # This is required for the instrumentation work properly.
-    # See docstring of `remove_mutable_association_listener` for further details
-    modeling_types.remove_mutable_association_listener()
     try:
         ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
     except BaseException as e:
-        messenger.failed(exception=e, tracked_changes=None, new_instances=None)
+        messenger.failed(e)
         return
 
-    with instrumentation.track_changes(ctx.model) as instrument:
-        try:
-            messenger.started()
-            _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
-            task_func = imports.load_attribute(implementation)
-            aria.install_aria_extensions()
-            for decorate in process_executor.decorate():
-                task_func = decorate(task_func)
-            task_func(ctx=ctx, **operation_inputs)
-            messenger.succeeded(tracked_changes=instrument.tracked_changes,
-                                new_instances=instrument.new_instances_as_dict)
-        except BaseException as e:
-            messenger.failed(exception=e,
-                             tracked_changes=instrument.tracked_changes,
-                             new_instances=instrument.new_instances_as_dict)
-        finally:
-            instrument.expunge_session()
+    try:
+        messenger.started()
+        task_func = imports.load_attribute(implementation)
+        aria.install_aria_extensions()
+        for decorate in process_executor.decorate():
+            task_func = decorate(task_func)
+        task_func(ctx=ctx, **operation_inputs)
+        messenger.succeeded()
+    except BaseException as e:
+        ctx.model.log._session.close()
+        ctx.model.log._engine.dispose()
+        messenger.failed(e)
 
 if __name__ == '__main__':
     _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index a11bb28..eb5ff6c 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -34,7 +34,7 @@ _INSTRUMENTED = {
         _models.Task.status: str,
         _models.Node.attributes: collection,
         # TODO: add support for pickled type
-        # _models.Parameter._value: some_type
+        _models.Parameter._value: lambda x: x
     },
     'new': (_models.Log, ),
 
@@ -106,7 +106,6 @@ class _Instrumentation(object):
         for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
             self._register_attribute_listener(instrumented_attribute=instrumented_attribute,
                                               attribute_type=attribute_type)
-            # TODO: Revisit this, why not?
             if not isinstance(attribute_type, _Collection):
                 instrumented_class = instrumented_attribute.parent.entity
                 instrumented_class_attributes = instrumented_attribute_classes.setdefault(
@@ -146,6 +145,7 @@ class _Instrumentation(object):
 
     def _register_append_to_attribute_listener(self, collection_attr):
         def listener(target, value, initiator):
+            import pydevd; pydevd.settrace('localhost', suspend=False)
             tracked_instances = self.tracked_changes.setdefault(target.__modelname__, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             collection_attr = tracked_attributes.setdefault(initiator.key, [])
@@ -161,13 +161,11 @@ class _Instrumentation(object):
 
     def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
         def listener(target, value, *_):
+            import pydevd; pydevd.settrace('localhost', suspend=False)
             mapi_name = target.__modelname__
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
-            if value is None:
-                current = None
-            else:
-                current = copy.deepcopy(attribute_type(value))
+            current = copy.deepcopy(attribute_type(value)) if value else None
             tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current)
             return current
         listener_args = (instrumented_attribute, 'set', listener)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 6802593..ca5154c 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -25,7 +25,9 @@ from aria import (
     operation,
 )
 from aria.orchestrator import context
+from aria.orchestrator.context import common
 from aria.orchestrator.workflows import api
+from aria.modeling.models import Parameter
 
 import tests
 from tests import (
@@ -263,7 +265,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 
 @pytest.fixture(params=[
-    (thread.ThreadExecutor, {}),
+    # (thread.ThreadExecutor, {}),
     (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
@@ -495,4 +497,124 @@ def attribute_altering_operation(ctx, attributes_dict, **_):
 def attribute_consuming_operation(ctx, holder_path, **_):
     holder = helpers.FilesystemDataHolder(holder_path)
     ctx.target_node.attributes.update(ctx.source_node.attributes)
-    holder.update(ctx.source_node.attributes)
+    holder.update(**ctx.source_node.attributes)
+
+
+class MockActor(object):
+    def __init__(self):
+        self.attributes = {}
+
+
+class MockModel(object):
+
+    def __init__(self):
+        self.parameter = type('MockModel', (object, ), {'model_cls': Parameter,
+                                                        'put': lambda *args, **kwargs: None,
+                                                        'update': lambda *args, **kwargs: None})()
+
+
+class TestDict():
+
+    @pytest.fixture
+    def actor(self):
+        return MockActor()
+
+    @pytest.fixture
+    def model(self):
+        return MockModel()
+
+    def test_keys(self, model, actor):
+        dict_ = common._Dict(actor, model)
+        actor.attributes.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')}
+        )
+        assert sorted(dict_.keys()) == sorted(['key1', 'key2'])
+
+    def test_values(self, model, actor):
+        dict_ = common._Dict(actor, model)
+        actor.attributes.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')}
+        )
+        assert sorted(dict_.values()) == sorted(['value1', 'value2'])
+
+    def test_items(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        actor.attributes.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')}
+        )
+        assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+    def test_iter(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        actor.attributes.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')}
+        )
+        assert sorted(list(dict_)) == sorted(['key1', 'key2'])
+
+    def test_bool(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        assert not dict_
+        actor.attributes.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')}
+        )
+        assert dict_
+
+    def test_set_item(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        dict_['key1'] = Parameter.wrap('key1', 'value1')
+        assert 'key1' in dict_
+        assert isinstance(dict_._attributes['key1'], Parameter)
+        assert dict_['key1'] == 'value1'
+
+        dict_['key1'] = {}
+        dict_['key1']['inner_key'] = 'value2'
+
+        assert isinstance(dict_._attributes['key1'], Parameter)
+        assert len(dict_) == 1
+        assert 'inner_key' in dict_['key1']
+        assert isinstance(dict_['key1'], common._Dict)
+        assert dict_['key1']['inner_key'] == 'value2'
+
+    def test_get_item(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        dict_['key1'] = Parameter.wrap('key1', 'value1')
+
+        assert isinstance(dict_._attributes['key1'], Parameter)
+
+    def test_update(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        dict_['key1'] = 'value1'
+
+        new_dict = {'key2': 'value2'}
+        dict_.update(new_dict)
+        assert len(dict_) == 2
+        assert dict_['key2'] == 'value2'
+        assert isinstance(dict_._attributes['key2'], Parameter)
+
+        new_dict = {}
+        new_dict.update(dict_)
+        assert new_dict['key1'] == dict_['key1']
+
+    def test_copy(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        dict_['key1'] = 'value1'
+
+        new_dict = dict_.copy()
+        assert new_dict is not dict_
+        assert new_dict == dict_
+
+        dict_['key1'] = 'value2'
+        assert new_dict['key1'] == 'value1'
+        assert dict_['key1'] == 'value2'
+
+    def test_clear(self, actor, model):
+        dict_ = common._Dict(actor, model)
+        dict_['key1'] = 'value1'
+        dict_.clear()
+
+        assert len(dict_) == 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 09d0499..d9115e1 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -43,26 +43,26 @@ class TestLocalRunScript(object):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.key value
+            ctx node attributes map.key value
             ''',
             windows_script='''
-            ctx node runtime-properties map.key value
+            ctx node attributes map.key value
         ''')
         props = self._run(
             executor, workflow_context,
             script_path=script_path)
-        assert props['map']['key'] == 'value'
+        assert props['map'].value['key'] == 'value'
 
     def test_process_env(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.key1 $key1
-            ctx node runtime-properties map.key2 $key2
+            ctx node attributes map.key1 $key1
+            ctx node attributes map.key2 $key2
             ''',
             windows_script='''
-            ctx node runtime-properties map.key1 %key1%
-            ctx node runtime-properties map.key2 %key2%
+            ctx node attributes map.key1 %key1%
+            ctx node attributes map.key2 %key2%
         ''')
         props = self._run(
             executor, workflow_context,
@@ -73,7 +73,7 @@ class TestLocalRunScript(object):
                     'key2': 'value2'
                 }
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['key1'] == 'value1'
         assert p_map['key2'] == 'value2'
 
@@ -81,10 +81,10 @@ class TestLocalRunScript(object):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.cwd $PWD
+            ctx node attributes map.cwd $PWD
             ''',
             windows_script='''
-            ctx node runtime-properties map.cwd %CD%
+            ctx node attributes map.cwd %CD%
             ''')
         tmpdir = str(tmpdir)
         props = self._run(
@@ -93,11 +93,11 @@ class TestLocalRunScript(object):
             process={
                 'cwd': tmpdir
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['cwd'] == tmpdir
 
     def test_process_command_prefix(self, executor, workflow_context, tmpdir):
-        use_ctx = 'ctx node runtime-properties map.key value'
+        use_ctx = 'ctx node attributes map.key value'
         python_script = ['import subprocess',
                          'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
         python_script = '\n'.join(python_script)
@@ -114,19 +114,19 @@ class TestLocalRunScript(object):
                 'env': {'TEST_KEY': 'value'},
                 'command_prefix': 'python'
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['key'] == 'value'
 
     def test_process_args(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.arg1 "$1"
-            ctx node runtime-properties map.arg2 $2
+            ctx node attributes map.arg1 "$1"
+            ctx node attributes map.arg2 $2
             ''',
             windows_script='''
-            ctx node runtime-properties map.arg1 %1
-            ctx node runtime-properties map.arg2 %2
+            ctx node attributes map.arg1 %1
+            ctx node attributes map.arg2 %2
             ''')
         props = self._run(
             executor, workflow_context,
@@ -134,8 +134,8 @@ class TestLocalRunScript(object):
             process={
                 'args': ['"arg with spaces"', 'arg2']
             })
-        assert props['map']['arg1'] == 'arg with spaces'
-        assert props['map']['arg2'] == 'arg2'
+        assert props['map'].value['arg1'] == 'arg with spaces'
+        assert props['map'].value['arg2'] == 'arg2'
 
     def test_no_script_path(self, executor, workflow_context):
         exception = self._run_and_get_task_exception(
@@ -187,7 +187,7 @@ class TestLocalRunScript(object):
         script = '''
 from aria.orchestrator.execution_plugin import ctx, inputs
 if __name__ == '__main__':
-    ctx.node.runtime_properties['key'] = inputs['key']
+    ctx.node.attributes['key'] = inputs['key']
 '''
         suffix = '.py'
         script_path = self._create_script(
@@ -200,7 +200,7 @@ if __name__ == '__main__':
             executor, workflow_context,
             script_path=script_path,
             inputs={'key': 'value'})
-        assert props['key'] == 'value'
+        assert props['key'].value == 'value'
 
     @pytest.mark.parametrize(
         'value', ['string-value', [1, 2, 3], 999, 3.14, False,
@@ -209,16 +209,17 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties key "${input_as_env_var}"
+            ctx node attributes key "${input_as_env_var}"
             ''',
             windows_script='''
-            ctx node runtime-properties key "%input_as_env_var%"
+            ctx node attributes key "%input_as_env_var%"
         ''')
         props = self._run(
             executor, workflow_context,
             script_path=script_path,
             env_var=value)
-        expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
         assert expected == value
 
     @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
@@ -227,10 +228,10 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties key "${input_as_env_var}"
+            ctx node attributes key "${input_as_env_var}"
             ''',
             windows_script='''
-            ctx node runtime-properties key "%input_as_env_var%"
+            ctx node attributes key "%input_as_env_var%"
         ''')
 
         props = self._run(
@@ -242,17 +243,18 @@ if __name__ == '__main__':
                     'input_as_env_var': value
                 }
             })
-        expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
         assert expected == value
 
     def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties nonexistent
+            ctx node attributes nonexistent
             ''',
             windows_script='''
-            ctx node runtime-properties nonexistent
+            ctx node attributes nonexistent
         ''')
         exception = self._run_and_get_task_exception(
             executor, workflow_context,
@@ -266,10 +268,10 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx -j instance runtime-properties nonexistent
+            ctx -j instance attributes nonexistent
             ''',
             windows_script='''
-            ctx -j instance runtime-properties nonexistent
+            ctx -j instance attributes nonexistent
             ''')
         exception = self._run_and_get_task_exception(
             executor, workflow_context,
@@ -502,7 +504,7 @@ if __name__ == '__main__':
             tasks_graph=tasks_graph)
         eng.execute()
         return workflow_context.model.node.get_by_name(
-            mock.models.DEPENDENCY_NODE_NAME).runtime_properties
+            mock.models.DEPENDENCY_NODE_NAME).attributes
 
     @pytest.fixture
     def executor(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 50ca7f5..b1b8251 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -100,7 +100,7 @@ class TestOperationTask(object):
         storage_task = ctx.model.task.get_by_name(core_task.name)
         assert storage_task.plugin is storage_plugin
         assert storage_task.execution_name == ctx.execution.name
-        assert storage_task.actor == core_task.context.node
+        assert storage_task.actor == core_task.context.node._actor
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.implementation == api_task.implementation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 1dbfae1..8ed2f82 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -31,6 +31,8 @@ from tests import mock
 from tests import storage
 
 
+# TODO: rethink this entire module
+
 def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
     _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
 
@@ -62,8 +64,7 @@ def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
     if not first:
         try:
             ctx.model.node.update(node)
-        except StorageError as e:
-            assert 'Version conflict' in str(e)
+        except StorageError:
             ctx.model.node.refresh(node)
         else:
             raise RuntimeError('Unexpected')
@@ -118,8 +119,8 @@ def _test(context, executor, lock_files, func, expected_failure):
         except ExecutorException:
             pass
 
-    props = _node(context).runtime_properties
-    assert props[key] == first_value
+    props = _node(context).attributes
+    assert props[key].value == first_value
 
     exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
     if expected_failure:
@@ -151,7 +152,6 @@ def lock_files(tmpdir):
 
 
 def _concurrent_update(lock_files, node, key, first_value, second_value):
-
     locker1 = fasteners.InterProcessLock(lock_files[0])
     locker2 = fasteners.InterProcessLock(lock_files[1])
 
@@ -165,7 +165,7 @@ def _concurrent_update(lock_files, node, key, first_value, second_value):
     else:
         locker2.acquire()
 
-    node.runtime_properties[key] = first_value if first else second_value
+    node.attributes[key] = first_value if first else second_value
 
     if first:
         locker1.release()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 878ac24..30b23ed 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -56,7 +56,7 @@ def test_decorate_extension(context, executor):
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
     eng.execute()
-    out = get_node(context).runtime_properties['out']
+    out = get_node(context).attributes.get('out').value
     assert out['wrapper_inputs'] == inputs
     assert out['function_inputs'] == inputs
 
@@ -67,7 +67,7 @@ class MockProcessExecutorExtension(object):
     def decorate(self):
         def decorator(function):
             def wrapper(ctx, **operation_inputs):
-                ctx.node.runtime_properties['out'] = {'wrapper_inputs': operation_inputs}
+                ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs}
                 function(ctx=ctx, **operation_inputs)
             return wrapper
         return decorator
@@ -75,7 +75,7 @@ class MockProcessExecutorExtension(object):
 
 @operation
 def _mock_operation(ctx, **operation_inputs):
-    ctx.node.runtime_properties['out']['function_inputs'] = operation_inputs
+    ctx.node.attributes['out']['function_inputs'] = operation_inputs
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 4fbe9c1..e2c7f83 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -28,7 +28,7 @@ from tests import mock
 from tests import storage
 
 
-_TEST_RUNTIME_PROPERTIES = {
+_TEST_ATTRIBUTES = {
     'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
 }
 
@@ -46,12 +46,13 @@ def test_track_changes_of_failed_operation(context, executor):
 
 def _assert_tracked_changes_are_applied(context):
     instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-    assert instance.runtime_properties == _TEST_RUNTIME_PROPERTIES
+    assert all(instance.attributes[key].value == value
+               for key, value in _TEST_ATTRIBUTES.items())
 
 
-def _update_runtime_properties(context):
-    context.node.runtime_properties.clear()
-    context.node.runtime_properties.update(_TEST_RUNTIME_PROPERTIES)
+def _update_attributes(context):
+    context.node.attributes.clear()
+    context.node.attributes.update(_TEST_ATTRIBUTES)
 
 
 def test_refresh_state_of_tracked_attributes(context, executor):
@@ -66,11 +67,9 @@ def test_apply_tracked_changes_during_an_operation(context, executor):
         'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
     }
 
-    expected_initial = context.model.node.get_by_name(
-        mock.models.DEPENDENCY_NODE_NAME).runtime_properties
-
-    out = _run_workflow(context=context, executor=executor, op_func=_mock_updating_operation,
-                        inputs=inputs)
+    expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes
+    out = _run_workflow(
+        context=context, executor=executor, op_func=_mock_updating_operation, inputs=inputs)
 
     expected_after_update = expected_initial.copy()
     expected_after_update.update(inputs['committed']) # pylint: disable=no-member
@@ -109,42 +108,42 @@ def _run_workflow(context, executor, op_func, inputs=None):
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
     eng.execute()
-    return context.model.node.get_by_name(
-        mock.models.DEPENDENCY_NODE_NAME).runtime_properties.get('out')
+    out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
+    return out.value if out else None
 
 
 @operation
 def _mock_success_operation(ctx):
-    _update_runtime_properties(ctx)
+    _update_attributes(ctx)
 
 
 @operation
 def _mock_fail_operation(ctx):
-    _update_runtime_properties(ctx)
+    _update_attributes(ctx)
     raise RuntimeError
 
 
 @operation
 def _mock_refreshing_operation(ctx):
-    out = {'initial': copy.deepcopy(ctx.node.runtime_properties)}
-    ctx.node.runtime_properties.update({'some': 'new', 'properties': 'right here'})
-    out['after_change'] = copy.deepcopy(ctx.node.runtime_properties)
+    out = {'initial': copy.deepcopy(ctx.node.attributes)}
+    ctx.node.attributes.update({'some': 'new', 'properties': 'right here'})
+    out['after_change'] = copy.deepcopy(ctx.node.attributes)
     ctx.model.node.refresh(ctx.node)
-    out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties)
-    ctx.node.runtime_properties['out'] = out
+    out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+    ctx.node.attributes['out'] = out
 
 
 @operation
 def _mock_updating_operation(ctx, committed, changed_but_refreshed):
-    out = {'initial': copy.deepcopy(ctx.node.runtime_properties)}
-    ctx.node.runtime_properties.update(committed)
+    out = {'initial': copy.deepcopy(ctx.node.attributes)}
+    ctx.node.attributes.update(committed)
     ctx.model.node.update(ctx.node)
-    out['after_update'] = copy.deepcopy(ctx.node.runtime_properties)
-    ctx.node.runtime_properties.update(changed_but_refreshed)
-    out['after_change'] = copy.deepcopy(ctx.node.runtime_properties)
+    out['after_update'] = copy.deepcopy(ctx.node.attributes)
+    ctx.node.attributes.update(changed_but_refreshed)
+    out['after_change'] = copy.deepcopy(ctx.node.attributes)
     ctx.model.node.refresh(ctx.node)
-    out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties)
-    ctx.node.runtime_properties['out'] = out
+    out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+    ctx.node.attributes['out'] = out
 
 
 def _operation_mapping(func):