You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by em...@apache.org on 2017/05/11 16:31:48 UTC

[3/6] incubator-ariatosca git commit: ARIA-213 Sporadic tests failures over locked database issue

ARIA-213 Sporadic tests failures over locked database issue

Move from 2 different sessions - one for the log, and the other for general model operations,
to one single session, while utilizing the keep tracking of changes mechanism for both logs and node/task states.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2ee06b8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2ee06b8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2ee06b8a

Branch: refs/heads/ARIA-139-attributes
Commit: 2ee06b8a6abe79f429458c7dbc5f9e1c31aec589
Parents: 6864d42
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue May 9 17:24:31 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 11 17:24:15 2017 +0300

----------------------------------------------------------------------
 aria/logger.py                                  | 25 +----
 aria/orchestrator/context/common.py             | 13 +--
 .../workflows/core/events_handler.py            |  4 +-
 aria/orchestrator/workflows/executor/process.py | 51 ++++++----
 aria/storage/instrumentation.py                 | 97 +++++++++++++++++---
 .../orchestrator/workflows/executor/__init__.py | 21 ++++-
 .../workflows/executor/test_executor.py         | 36 ++++++--
 .../workflows/executor/test_process_executor.py | 13 ++-
 tests/storage/test_instrumentation.py           | 37 +++++++-
 9 files changed, 219 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 97d3878..bd7ed4e 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -114,17 +114,11 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
     return console
 
 
-def create_sqla_log_handler(session, engine, log_cls, execution_id, level=logging.DEBUG):
+def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG):
 
     # This is needed since the engine and session are entirely new we need to reflect the db
     # schema of the logging model into the engine and session.
-    log_cls.__table__.create(bind=engine, checkfirst=True)
-
-    return _SQLAlchemyHandler(session=session,
-                              engine=engine,
-                              log_cls=log_cls,
-                              execution_id=execution_id,
-                              level=level)
+    return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level)
 
 
 class _DefaultConsoleFormat(logging.Formatter):
@@ -168,10 +162,9 @@ def create_file_log_handler(
 
 class _SQLAlchemyHandler(logging.Handler):
 
-    def __init__(self, session, engine, log_cls, execution_id, **kwargs):
+    def __init__(self, model, log_cls, execution_id, **kwargs):
         logging.Handler.__init__(self, **kwargs)
-        self._session = session
-        self._engine = engine
+        self._model = model
         self._cls = log_cls
         self._execution_id = execution_id
 
@@ -188,15 +181,7 @@ class _SQLAlchemyHandler(logging.Handler):
             # Not mandatory.
             traceback=getattr(record, 'traceback', None)
         )
-        self._session.add(log)
-
-        try:
-            self._session.commit()
-        except BaseException:
-            self._session.rollback()
-            raise
-        finally:
-            self._session.close()
+        self._model.log.put(log)
 
 
 _default_file_formatter = logging.Formatter(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 64ef9a4..0854a27 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -79,13 +79,9 @@ class BaseContext(object):
             self.logger.addHandler(self._get_sqla_handler())
 
     def _get_sqla_handler(self):
-        api_kwargs = {}
-        if self._model._initiator:
-            api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs))
-        api_kwargs.update(**self._model._api_kwargs)
-        return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log,
-                                                   execution_id=self._execution_id,
-                                                   **api_kwargs)
+        return aria_logger.create_sqla_log_handler(model=self._model,
+                                                   log_cls=modeling.models.Log,
+                                                   execution_id=self._execution_id)
 
     def __repr__(self):
         return (
@@ -196,7 +192,6 @@ class BaseContext(object):
 
     def _render_resource(self, resource_content, variables):
         variables = variables or {}
-        if 'ctx' not in variables:
-            variables['ctx'] = self
+        variables.setdefault('ctx', self)
         resource_template = jinja2.Template(resource_content)
         return resource_template.render(variables)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index f3e4e7e..669fb43 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -40,7 +40,7 @@ def _task_started(task, *args, **kwargs):
     with task._update():
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
-        _update_node_state_if_necessary(task, is_transitional=True)
+    _update_node_state_if_necessary(task, is_transitional=True)
 
 
 @events.on_failure_task_signal.connect
@@ -74,7 +74,7 @@ def _task_succeeded(task, *args, **kwargs):
         task.ended_at = datetime.utcnow()
         task.status = task.SUCCESS
 
-        _update_node_state_if_necessary(task)
+    _update_node_state_if_necessary(task)
 
 
 @events.start_workflow_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index e464f7d..824c4e1 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -229,6 +229,7 @@ class ProcessExecutor(base.BaseExecutor):
     def _apply_tracked_changes(task, request):
         instrumentation.apply_tracked_changes(
             tracked_changes=request['tracked_changes'],
+            new_instances=request['new_instances'],
             model=task.context.model)
 
 
@@ -277,22 +278,28 @@ class _Messenger(object):
         """Task started message"""
         self._send_message(type='started')
 
-    def succeeded(self, tracked_changes):
+    def succeeded(self, tracked_changes, new_instances):
         """Task succeeded message"""
-        self._send_message(type='succeeded', tracked_changes=tracked_changes)
+        self._send_message(
+            type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
 
-    def failed(self, tracked_changes, exception):
+    def failed(self, tracked_changes, new_instances, exception):
         """Task failed message"""
-        self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
+        self._send_message(type='failed',
+                           tracked_changes=tracked_changes,
+                           new_instances=new_instances,
+                           exception=exception)
 
-    def apply_tracked_changes(self, tracked_changes):
-        self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
+    def apply_tracked_changes(self, tracked_changes, new_instances):
+        self._send_message(type='apply_tracked_changes',
+                           tracked_changes=tracked_changes,
+                           new_instances=new_instances)
 
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
 
-    def _send_message(self, type, tracked_changes=None, exception=None):
+    def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
@@ -301,7 +308,8 @@ class _Messenger(object):
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
                 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
-                'tracked_changes': tracked_changes
+                'tracked_changes': tracked_changes or {},
+                'new_instances': new_instances or {}
             })
             response = _recv_message(sock)
             response_exception = response.get('exception')
@@ -311,7 +319,7 @@ class _Messenger(object):
             sock.close()
 
 
-def _patch_session(ctx, messenger, instrument):
+def _patch_ctx(ctx, messenger, instrument):
     # model will be None only in tests that test the executor component directly
     if not ctx.model:
         return
@@ -326,12 +334,13 @@ def _patch_session(ctx, messenger, instrument):
         original_refresh(target)
 
     def patched_commit():
-        messenger.apply_tracked_changes(instrument.tracked_changes)
+        messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
+        instrument.expunge_session()
         instrument.clear()
 
     def patched_rollback():
         # Rollback is performed on parent process when commit fails
-        pass
+        instrument.expunge_session()
 
     # when autoflush is set to true (the default), refreshing an object will trigger
     # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
@@ -363,21 +372,29 @@ def _main():
     # This is required for the instrumentation work properly.
     # See docstring of `remove_mutable_association_listener` for further details
     modeling_types.remove_mutable_association_listener()
+    try:
+        ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
+    except BaseException as e:
+        messenger.failed(exception=e, tracked_changes=None, new_instances=None)
+        return
 
-    with instrumentation.track_changes() as instrument:
+    with instrumentation.track_changes(ctx.model) as instrument:
         try:
             messenger.started()
-            ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
-            _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
+            _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
             task_func = imports.load_attribute(implementation)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():
                 task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
-            messenger.succeeded(tracked_changes=instrument.tracked_changes)
+            messenger.succeeded(tracked_changes=instrument.tracked_changes,
+                                new_instances=instrument.new_instances)
         except BaseException as e:
-            messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)
-
+            messenger.failed(exception=e,
+                             tracked_changes=instrument.tracked_changes,
+                             new_instances=instrument.new_instances)
+        finally:
+            instrument.expunge_session()
 
 if __name__ == '__main__':
     _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index cf2a365..390f933 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
 import copy
 import json
+import os
 
 import sqlalchemy.event
 
@@ -26,11 +26,19 @@ from ..storage.exceptions import StorageError
 _VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
-    _models.Node.runtime_properties: dict
+    'modified': {
+        _models.Node.runtime_properties: dict,
+        _models.Node.state: str,
+        _models.Task.status: str,
+    },
+    'new': (_models.Log, )
+
 }
 
+_NEW_INSTANCE = 'NEW_INSTANCE'
+
 
-def track_changes(instrumented=None):
+def track_changes(model=None, instrumented=None):
     """Track changes in the specified model columns
 
     This call will register event listeners using sqlalchemy's event mechanism. The listeners
@@ -50,32 +58,78 @@ def track_changes(instrumented=None):
     will then call ``apply_tracked_changes()`` that resides in this module as well.
     At that point, the changes will actually be written back to the database.
 
+    :param model: the model storage. it should hold a mapi for each model. the session of each mapi
+    is needed to setup events
     :param instrumented: A dict from model columns to their python native type
     :return: The instrumentation context
     """
-    return _Instrumentation(instrumented or _INSTRUMENTED)
+    return _Instrumentation(model, instrumented or _INSTRUMENTED)
 
 
 class _Instrumentation(object):
 
-    def __init__(self, instrumented):
+    def __init__(self, model, instrumented):
         self.tracked_changes = {}
+        self.new_instances = {}
         self.listeners = []
+        self._instances_to_expunge = []
+        self._model = model
         self._track_changes(instrumented)
 
+    @property
+    def _new_instance_id(self):
+        return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE,
+                                         index=len(self._instances_to_expunge))
+
+    def expunge_session(self):
+        for new_instance in self._instances_to_expunge:
+            self._get_session_from_model(new_instance.__tablename__).expunge(new_instance)
+
+    def _get_session_from_model(self, tablename):
+        mapi = getattr(self._model, tablename, None)
+        if mapi:
+            return mapi._session
+        raise StorageError("Could not retrieve session for {0}".format(tablename))
+
     def _track_changes(self, instrumented):
-        instrumented_classes = {}
-        for instrumented_attribute, attribute_type in instrumented.items():
+        instrumented_attribute_classes = {}
+        # Track any newly-set attributes.
+        for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
             self._register_set_attribute_listener(
                 instrumented_attribute=instrumented_attribute,
                 attribute_type=attribute_type)
             instrumented_class = instrumented_attribute.parent.entity
-            instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {})
+            instrumented_class_attributes = instrumented_attribute_classes.setdefault(
+                instrumented_class, {})
             instrumented_class_attributes[instrumented_attribute.key] = attribute_type
-        for instrumented_class, instrumented_attributes in instrumented_classes.items():
-            self._register_instance_listeners(
-                instrumented_class=instrumented_class,
-                instrumented_attributes=instrumented_attributes)
+
+        # Track any global instance update such as 'refresh' or 'load'
+        for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
+            self._register_instance_listeners(instrumented_class=instrumented_class,
+                                              instrumented_attributes=instrumented_attributes)
+
+        # Track any newly created instances.
+        for instrumented_class in instrumented.get('new', {}):
+            self._register_new_instance_listener(instrumented_class)
+
+    def _register_new_instance_listener(self, instrumented_class):
+        if self._model is None:
+            raise StorageError("In order to keep track of new instances, a ctx is needed")
+
+        def listener(_, instance):
+            if not isinstance(instance, instrumented_class):
+                return
+            self._instances_to_expunge.append(instance)
+            tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
+            tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
+            instance_as_dict = instance.to_dict()
+            instance_as_dict.update((k, getattr(instance, k))
+                                    for k in getattr(instance, '__private_fields__', []))
+            tracked_attributes.update(instance_as_dict)
+        session = self._get_session_from_model(instrumented_class.__tablename__)
+        listener_args = (session, 'after_attach', listener)
+        sqlalchemy.event.listen(*listener_args)
+        self.listeners.append(listener_args)
 
     def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
         def listener(target, value, *_):
@@ -125,6 +179,9 @@ class _Instrumentation(object):
         else:
             self.tracked_changes.clear()
 
+        self.new_instances.clear()
+        self._instances_to_expunge = []
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -160,7 +217,7 @@ class _Value(object):
         return {'initial': self.initial, 'current': self.current}.copy()
 
 
-def apply_tracked_changes(tracked_changes, model):
+def apply_tracked_changes(tracked_changes, new_instances, model):
     """Write tracked changes back to the database using provided model storage
 
     :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
@@ -169,6 +226,7 @@ def apply_tracked_changes(tracked_changes, model):
     """
     successfully_updated_changes = dict()
     try:
+        # handle instance updates
         for mapi_name, tracked_instances in tracked_changes.items():
             successfully_updated_changes[mapi_name] = dict()
             mapi = getattr(model, mapi_name)
@@ -177,18 +235,27 @@ def apply_tracked_changes(tracked_changes, model):
                 instance = None
                 for attribute_name, value in tracked_attributes.items():
                     if value.initial != value.current:
-                        if not instance:
-                            instance = mapi.get(instance_id)
+                        instance = instance or mapi.get(instance_id)
                         setattr(instance, attribute_name, value.current)
                 if instance:
                     _validate_version_id(instance, mapi)
                     mapi.update(instance)
                     successfully_updated_changes[mapi_name][instance_id] = [
                         v.dict for v in tracked_attributes.values()]
+
+        # Handle new instances
+        for mapi_name, new_instance in new_instances.items():
+            successfully_updated_changes[mapi_name] = dict()
+            mapi = getattr(model, mapi_name)
+            for new_instance_kwargs in new_instance.values():
+                instance = mapi.model_cls(**new_instance_kwargs)
+                mapi.put(instance)
+                successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
     except BaseException:
         for key, value in successfully_updated_changes.items():
             if not value:
                 del successfully_updated_changes[key]
+        # TODO: if the successful has _STUB, the logging fails because it can't serialize the object
         model.logger.error(
             'Registering all the changes to the storage has failed. {0}'
             'The successful updates were: {0} '

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index cedcc5f..8ad8edb 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -17,6 +17,7 @@ import logging
 from collections import namedtuple
 from contextlib import contextmanager
 
+import aria
 from aria.modeling import models
 
 
@@ -24,7 +25,7 @@ class MockTask(object):
 
     INFINITE_RETRIES = models.Task.INFINITE_RETRIES
 
-    def __init__(self, implementation, inputs=None, plugin=None):
+    def __init__(self, implementation, inputs=None, plugin=None, storage=None):
         self.implementation = self.name = implementation
         self.plugin_fk = plugin.id if plugin else None
         self.plugin = plugin or None
@@ -33,7 +34,7 @@ class MockTask(object):
         self.exception = None
         self.id = str(uuid.uuid4())
         self.logger = logging.getLogger()
-        self.context = MockContext()
+        self.context = MockContext(storage)
         self.attempts_count = 1
         self.max_attempts = 1
         self.ignore_failure = False
@@ -52,14 +53,24 @@ class MockTask(object):
 
 class MockContext(object):
 
-    def __init__(self):
+    def __init__(self, storage=None):
         self.logger = logging.getLogger('mock_logger')
         self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
-        self.serialization_dict = {'context_cls': self.__class__, 'context': {}}
+        self.model = storage
+
+    @property
+    def serialization_dict(self):
+        if self.model:
+            return {'context': self.model.serialization_dict, 'context_cls': self.__class__}
+        else:
+            return {'context_cls': self.__class__, 'context': {}}
 
     def __getattr__(self, item):
         return None
 
     @classmethod
     def deserialize_from_dict(cls, **kwargs):
-        return cls()
+        if kwargs:
+            return cls(storage=aria.application_model_storage(**kwargs))
+        else:
+            return cls()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index d4482ae..29cb0e8 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,6 +25,7 @@ except ImportError:
     _celery = None
     app = None
 
+import aria
 from aria.modeling import models
 from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
@@ -41,12 +42,13 @@ def _get_implementation(func):
     return '{module}.{func.__name__}'.format(module=__name__, func=func)
 
 
-def test_execute(executor):
+def execute_and_assert(executor, storage=None):
     expected_value = 'value'
-    successful_task = MockTask(_get_implementation(mock_successful_task))
-    failing_task = MockTask(_get_implementation(mock_failing_task))
+    successful_task = MockTask(_get_implementation(mock_successful_task), storage=storage)
+    failing_task = MockTask(_get_implementation(mock_failing_task), storage=storage)
     task_with_inputs = MockTask(_get_implementation(mock_task_with_input),
-                                inputs={'input': models.Parameter.wrap('input', 'value')})
+                                inputs={'input': models.Parameter.wrap('input', 'value')},
+                                storage=storage)
 
     for task in [successful_task, failing_task, task_with_inputs]:
         executor.execute(task)
@@ -62,6 +64,14 @@ def test_execute(executor):
     assertion()
 
 
+def test_thread_execute(thread_executor):
+    execute_and_assert(thread_executor)
+
+
+def test_process_execute(process_executor, storage):
+    execute_and_assert(process_executor, storage)
+
+
 def mock_successful_task(**_):
     pass
 
@@ -83,21 +93,35 @@ class MockException(Exception):
     pass
 
 
+@pytest.fixture
+def storage(tmpdir):
+    return aria.application_model_storage(
+        aria.storage.sql_mapi.SQLAlchemyModelAPI,
+        initiator_kwargs=dict(base_dir=str(tmpdir))
+    )
+
+
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {'pool_size': 1}),
     (thread.ThreadExecutor, {'pool_size': 2}),
     # subprocess needs to load a tests module so we explicitly add the root directory as if
     # the project has been installed in editable mode
-    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
     # (celery.CeleryExecutor, {'app': app})
 ])
-def executor(request):
+def thread_executor(request):
     executor_cls, executor_kwargs = request.param
     result = executor_cls(**executor_kwargs)
     yield result
     result.close()
 
 
+@pytest.fixture
+def process_executor():
+    result = process.ProcessExecutor(python_path=tests.ROOT_DIR)
+    yield result
+    result.close()
+
+
 @pytest.fixture(autouse=True)
 def register_signals():
     def start_handler(task, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 5f240b2..e6333e8 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -18,6 +18,7 @@ import Queue
 
 import pytest
 
+import aria
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
@@ -34,8 +35,8 @@ from . import MockTask
 
 class TestProcessExecutor(object):
 
-    def test_plugin_execution(self, executor, mock_plugin):
-        task = MockTask('mock_plugin1.operation', plugin=mock_plugin)
+    def test_plugin_execution(self, executor, mock_plugin, storage):
+        task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage)
 
         queue = Queue.Queue()
 
@@ -81,3 +82,11 @@ def mock_plugin(plugin_manager, tmpdir):
     source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
     plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
     return plugin_manager.install(source=plugin_path)
+
+
+@pytest.fixture
+def storage(tmpdir):
+    return aria.application_model_storage(
+        aria.storage.sql_mapi.SQLAlchemyModelAPI,
+        initiator_kwargs=dict(base_dir=str(tmpdir))
+    )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2ee06b8a/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 673103e..bdbb17e 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -227,6 +227,7 @@ class TestInstrumentation(object):
 
         instrumentation.apply_tracked_changes(
             tracked_changes=instrument.tracked_changes,
+            new_instances={},
             model=storage)
 
         instance1_1, instance1_2, instance2_1, instance2_2 = get_instances()
@@ -273,8 +274,40 @@ class TestInstrumentation(object):
         instrument.clear()
         assert instrument.tracked_changes == {}
 
-    def _track_changes(self, instrumented):
-        instrument = instrumentation.track_changes(instrumented)
+    def test_new_instances(self, storage):
+        model_kwargs = dict(
+            name='name',
+            dict1={'initial': 'value'},
+            dict2={'initial': 'value'},
+            list1=['initial'],
+            list2=['initial'],
+            int1=0,
+            int2=0,
+            string2='string')
+        model_instance_1 = MockModel1(**model_kwargs)
+        model_instance_2 = MockModel2(**model_kwargs)
+
+        instrument = self._track_changes(model=storage, instrumented_new=(MockModel1,))
+        assert not instrument.tracked_changes
+
+        storage.mock_model_1.put(model_instance_1)
+        storage.mock_model_2.put(model_instance_2)
+        # Assert all models made it to storage
+        assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1
+
+        # Assert only one model was tracked
+        assert len(instrument.new_instances) == 1
+
+        mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0]
+        storage_model1_instance = storage.mock_model_1.get(model_instance_1.id)
+
+        for key in model_kwargs:
+            assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key)
+
+    def _track_changes(self, instrumented_modified=None, model=None, instrumented_new=None):
+        instrument = instrumentation.track_changes(
+            model=model,
+            instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}})
         instruments_holder.append(instrument)
         return instrument