You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/11 11:22:55 UTC
incubator-ariatosca git commit: added tests and moved expunge to
process.py [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue 9253cfdeb -> 1b9c63dcc (forced update)
added tests and moved expunge to process.py
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1b9c63dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1b9c63dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1b9c63dc
Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 1b9c63dcce1e5d3625e77d8fab1e87e1cb991f0c
Parents: a5fc809
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu May 11 14:05:46 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 11 14:22:50 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/executor/process.py | 1 +
aria/storage/instrumentation.py | 37 ++++++++++++-------
tests/storage/test_instrumentation.py | 38 ++++++++++++++++++--
3 files changed, 61 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index a2f92f9..e8cf019 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -335,6 +335,7 @@ def _patch_ctx(ctx, messenger, instrument):
def patched_commit():
messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
+ instrument.expunge_session()
instrument.clear()
def patched_rollback():
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 98770de..6af9473 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -68,7 +68,9 @@ class _Instrumentation(object):
self.tracked_changes = {}
self.new_instances = {}
self.listeners = []
- self._track_changes(ctx, instrumented)
+ self._new_modeled_instances = []
+ self._ctx = ctx
+ self._track_changes(instrumented)
self._new_instance_index = 0
@property
@@ -76,7 +78,17 @@ class _Instrumentation(object):
self._new_instance_index += 1
return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1)
- def _track_changes(self, ctx, instrumented):
+ def expunge_session(self):
+ for new_instance in self._new_modeled_instances:
+ self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance)
+
+ def _get_session_from_ctx(self, tablename):
+ mapi = getattr(self._ctx.model, tablename, None)
+ if mapi:
+ return mapi._session
+ raise StorageError("Could not retrieve session for {0}".format(tablename))
+
+ def _track_changes(self, instrumented):
instrumented_attribute_classes = {}
for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
self._register_set_attribute_listener(
@@ -93,29 +105,27 @@ class _Instrumentation(object):
# instrument creation of new instances
for instrumented_class in instrumented.get('new', {}):
- self._register_instance_listener(ctx, instrumented_class)
+ self._register_instance_listener(instrumented_class)
- def _register_instance_listener(self, ctx, instrumented_class):
- if ctx is None:
+ def _register_instance_listener(self, instrumented_class):
+ if self._ctx is None:
if instrumented_class:
raise StorageError("In order to keep track of new instances, a ctx is needed")
else:
return
- def listener(session, instance):
+ def listener(_, instance):
if not isinstance(instance, instrumented_class):
return
- session.expunge(instance)
+ self._new_modeled_instances.append(instance)
tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
instance_as_dict = instance.to_dict()
- instance_as_dict.update((k, getattr(instance, k)) for k in instance.__private_fields__)
+ instance_as_dict.update((k, getattr(instance, k))
+ for k in getattr(instance, '__private_fields__', []))
tracked_attributes.update(instance_as_dict)
- mapi = getattr(ctx.model, instrumented_class.__tablename__, None)
- if not mapi:
- raise StorageError(
- "Could not retrieve session for {0}".format(instrumented_class.__tablename__))
- listener_args = (mapi._session, 'after_attach', listener)
+ session = self._get_session_from_ctx(instrumented_class.__tablename__)
+ listener_args = (session, 'after_attach', listener)
sqlalchemy.event.listen(*listener_args)
self.listeners.append(listener_args)
@@ -168,6 +178,7 @@ class _Instrumentation(object):
self.tracked_changes.clear()
self.new_instances.clear()
+ self._new_modeled_instances = []
def restore(self):
"""Remove all listeners registered by this instrumentation"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b9c63dc/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 227ee7b..24c4204 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -27,6 +27,7 @@ from aria.storage import (
sql_mapi,
instrumentation
)
+from tests.orchestrator.workflows.executor import MockContext
from . import release_sqlite_storage, init_inmemory_model_storage
@@ -275,8 +276,41 @@ class TestInstrumentation(object):
instrument.clear()
assert instrument.tracked_changes == {}
- def _track_changes(self, instrumented):
- instrument = instrumentation.track_changes(instrumented={'modified': instrumented})
+ def test_new_instances(self, storage):
+ model_kwargs = dict(
+ name='name',
+ dict1={'initial': 'value'},
+ dict2={'initial': 'value'},
+ list1=['initial'],
+ list2=['initial'],
+ int1=0,
+ int2=0,
+ string2='string')
+ model_instance_1 = MockModel1(**model_kwargs)
+ model_instance_2 = MockModel2(**model_kwargs)
+
+ ctx = MockContext(storage)
+ instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, ))
+ assert not instrument.tracked_changes
+
+ storage.mock_model_1.put(model_instance_1)
+ storage.mock_model_2.put(model_instance_2)
+ # Assert all models made it to storage
+ assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1
+
+ # Assert only one model was tracked
+ assert len(instrument.new_instances) == 1
+
+ mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0]
+ storage_model1_instance = storage.mock_model_1.get(model_instance_1.id)
+
+ for key in model_kwargs:
+ assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key)
+
+ def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None):
+ instrument = instrumentation.track_changes(
+ ctx=ctx,
+ instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}})
instruments_holder.append(instrument)
return instrument