You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/11 11:22:55 UTC

incubator-ariatosca git commit: added tests and moved expunge to process.py [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue 9253cfdeb -> 1b9c63dcc (forced update)


added tests and moved expunge to process.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1b9c63dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1b9c63dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1b9c63dc

Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 1b9c63dcce1e5d3625e77d8fab1e87e1cb991f0c
Parents: a5fc809
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu May 11 14:05:46 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 11 14:22:50 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  1 +
 aria/storage/instrumentation.py                 | 37 ++++++++++++-------
 tests/storage/test_instrumentation.py           | 38 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index a2f92f9..e8cf019 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -335,6 +335,7 @@ def _patch_ctx(ctx, messenger, instrument):
 
     def patched_commit():
         messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
+        instrument.expunge_session()
         instrument.clear()
 
     def patched_rollback():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 98770de..6af9473 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -68,7 +68,9 @@ class _Instrumentation(object):
         self.tracked_changes = {}
         self.new_instances = {}
         self.listeners = []
-        self._track_changes(ctx, instrumented)
+        self._new_modeled_instances = []
+        self._ctx = ctx
+        self._track_changes(instrumented)
         self._new_instance_index = 0
 
     @property
@@ -76,7 +78,17 @@ class _Instrumentation(object):
         self._new_instance_index += 1
         return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1)
 
-    def _track_changes(self, ctx, instrumented):
+    def expunge_session(self):
+        for new_instance in self._new_modeled_instances:
+            self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance)
+
+    def _get_session_from_ctx(self, tablename):
+        mapi = getattr(self._ctx.model, tablename, None)
+        if mapi:
+            return mapi._session
+        raise StorageError("Could not retrieve session for {0}".format(tablename))
+
+    def _track_changes(self, instrumented):
         instrumented_attribute_classes = {}
         for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
             self._register_set_attribute_listener(
@@ -93,29 +105,27 @@ class _Instrumentation(object):
 
         # instrument creation of new instances
         for instrumented_class in instrumented.get('new', {}):
-            self._register_instance_listener(ctx, instrumented_class)
+            self._register_instance_listener(instrumented_class)
 
-    def _register_instance_listener(self, ctx, instrumented_class):
-        if ctx is None:
+    def _register_instance_listener(self, instrumented_class):
+        if self._ctx is None:
             if instrumented_class:
                 raise StorageError("In order to keep track of new instances, a ctx is needed")
             else:
                 return
 
-        def listener(session, instance):
+        def listener(_, instance):
             if not isinstance(instance, instrumented_class):
                 return
-            session.expunge(instance)
+            self._new_modeled_instances.append(instance)
             tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
             tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
             instance_as_dict = instance.to_dict()
-            instance_as_dict.update((k, getattr(instance, k)) for k in instance.__private_fields__)
+            instance_as_dict.update((k, getattr(instance, k))
+                                    for k in getattr(instance, '__private_fields__', []))
             tracked_attributes.update(instance_as_dict)
-        mapi = getattr(ctx.model, instrumented_class.__tablename__, None)
-        if not mapi:
-            raise StorageError(
-                "Could not retrieve session for {0}".format(instrumented_class.__tablename__))
-        listener_args = (mapi._session, 'after_attach', listener)
+        session = self._get_session_from_ctx(instrumented_class.__tablename__)
+        listener_args = (session, 'after_attach', listener)
         sqlalchemy.event.listen(*listener_args)
         self.listeners.append(listener_args)
 
@@ -168,6 +178,7 @@ class _Instrumentation(object):
             self.tracked_changes.clear()
 
         self.new_instances.clear()
+        self._new_modeled_instances = []
 
     def restore(self):
         """Remove all listeners registered by this instrumentation"""

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 227ee7b..24c4204 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -27,6 +27,7 @@ from aria.storage import (
     sql_mapi,
     instrumentation
 )
+from tests.orchestrator.workflows.executor import MockContext
 
 from . import release_sqlite_storage, init_inmemory_model_storage
 
@@ -275,8 +276,41 @@ class TestInstrumentation(object):
         instrument.clear()
         assert instrument.tracked_changes == {}
 
-    def _track_changes(self, instrumented):
-        instrument = instrumentation.track_changes(instrumented={'modified': instrumented})
+    def test_new_instances(self, storage):
+        model_kwargs = dict(
+            name='name',
+            dict1={'initial': 'value'},
+            dict2={'initial': 'value'},
+            list1=['initial'],
+            list2=['initial'],
+            int1=0,
+            int2=0,
+            string2='string')
+        model_instance_1 = MockModel1(**model_kwargs)
+        model_instance_2 = MockModel2(**model_kwargs)
+
+        ctx = MockContext(storage)
+        instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, ))
+        assert not instrument.tracked_changes
+
+        storage.mock_model_1.put(model_instance_1)
+        storage.mock_model_2.put(model_instance_2)
+        # Assert all models made it to storage
+        assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1
+
+        # Assert only one model was tracked
+        assert len(instrument.new_instances) == 1
+
+        mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0]
+        storage_model1_instance = storage.mock_model_1.get(model_instance_1.id)
+
+        for key in model_kwargs:
+            assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key)
+
+    def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None):
+        instrument = instrumentation.track_changes(
+            ctx=ctx,
+            instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}})
         instruments_holder.append(instrument)
         return instrument