You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/17 19:12:55 UTC

[3/3] incubator-ariatosca git commit: ARIA-79-concurrent-modifications

ARIA-79-concurrent-modifications


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/96581d9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/96581d9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/96581d9f

Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: 96581d9f9b00736d3c83ecda20d2e9f81073d4da
Parents: 787d7e7
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Mon Jan 30 16:49:00 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Fri Feb 17 21:12:36 2017 +0200

----------------------------------------------------------------------
 aria/.pylintrc                                  |   2 +-
 aria/orchestrator/workflows/executor/process.py | 162 +++++++++++-------
 aria/storage/instrumentation.py                 |  73 +++++++--
 aria/storage/modeling/instance_elements.py      |   5 +-
 aria/storage/sql_mapi.py                        |   4 +
 aria/storage_initializer.py                     |   1 -
 tests/mock/models.py                            |   2 -
 ...process_executor_concurrent_modifications.py | 164 +++++++++++++++++++
 tests/requirements.txt                          |   3 +-
 tests/storage/test_structures.py                |   1 -
 10 files changed, 338 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/.pylintrc
----------------------------------------------------------------------
diff --git a/aria/.pylintrc b/aria/.pylintrc
index ee4d0ef..b7656a3 100644
--- a/aria/.pylintrc
+++ b/aria/.pylintrc
@@ -250,7 +250,7 @@ docstring-min-length=-1
 [ELIF]
 
 # Maximum number of nested blocks for function / method body
-max-nested-blocks=5
+max-nested-blocks=7
 
 
 [SIMILARITIES]

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 560ac43..75bbbce 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -53,6 +53,8 @@ _IS_WIN = os.name == 'nt'
 
 _INT_FMT = 'I'
 _INT_SIZE = struct.calcsize(_INT_FMT)
+UPDATE_TRACKED_CHANGES_FAILED_STR = \
+    'Some changes failed writing to storage. For more info refer to the log.'
 
 
 class ProcessExecutor(base.BaseExecutor):
@@ -74,6 +76,13 @@ class ProcessExecutor(base.BaseExecutor):
         # Contains reference to all currently running tasks
         self._tasks = {}
 
+        self._request_handlers = {
+            'started': self._handle_task_started_request,
+            'succeeded': self._handle_task_succeeded_request,
+            'failed': self._handle_task_failed_request,
+            'apply_tracked_changes': self._handle_apply_tracked_changes_request
+        }
+
         # Server socket used to accept task status messages from subprocesses
         self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self._server_socket.bind(('localhost', 0))
@@ -131,58 +140,6 @@ class ProcessExecutor(base.BaseExecutor):
     def _remove_task(self, task_id):
         return self._tasks.pop(task_id)
 
-    def _listener(self):
-        # Notify __init__ method this thread has actually started
-        self._listener_started.put(True)
-        while not self._stopped:
-            try:
-                # Accept messages written to the server socket
-                with contextlib.closing(self._server_socket.accept()[0]) as connection:
-                    message = self._recv_message(connection)
-                    message_type = message['type']
-                    if message_type == 'closed':
-                        break
-                    task_id = message['task_id']
-                    if message_type == 'started':
-                        self._task_started(self._tasks[task_id])
-                    elif message_type == 'apply_tracked_changes':
-                        task = self._tasks[task_id]
-                        instrumentation.apply_tracked_changes(
-                            tracked_changes=message['tracked_changes'],
-                            model=task.context.model)
-                    elif message_type == 'succeeded':
-                        task = self._remove_task(task_id)
-                        instrumentation.apply_tracked_changes(
-                            tracked_changes=message['tracked_changes'],
-                            model=task.context.model)
-                        self._task_succeeded(task)
-                    elif message_type == 'failed':
-                        task = self._remove_task(task_id)
-                        instrumentation.apply_tracked_changes(
-                            tracked_changes=message['tracked_changes'],
-                            model=task.context.model)
-                        self._task_failed(task, exception=message['exception'])
-                    else:
-                        raise RuntimeError('Invalid state')
-            except BaseException as e:
-                self.logger.debug('Error in process executor listener: {0}'.format(e))
-
-    def _recv_message(self, connection):
-        message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
-        return jsonpickle.loads(self._recv_bytes(connection, message_len))
-
-    @staticmethod
-    def _recv_bytes(connection, count):
-        result = io.BytesIO()
-        while True:
-            if not count:
-                return result.getvalue()
-            read = connection.recv(count)
-            if not read:
-                return result.getvalue()
-            result.write(read)
-            count -= len(read)
-
     def _check_closed(self):
         if self._stopped:
             raise RuntimeError('Executor closed')
@@ -231,6 +188,90 @@ class ProcessExecutor(base.BaseExecutor):
                 os.pathsep,
                 env.get('PYTHONPATH', ''))
 
+    def _listener(self):
+        # Notify __init__ method this thread has actually started
+        self._listener_started.put(True)
+        while not self._stopped:
+            try:
+                with self._accept_request() as (request, response):
+                    request_type = request['type']
+                    if request_type == 'closed':
+                        break
+                    request_handler = self._request_handlers.get(request_type)
+                    if not request_handler:
+                        raise RuntimeError('Invalid request type: {0}'.format(request_type))
+                    request_handler(task_id=request['task_id'], request=request, response=response)
+            except BaseException as e:
+                self.logger.debug('Error in process executor listener: {0}'.format(e))
+
+    @contextlib.contextmanager
+    def _accept_request(self):
+        with contextlib.closing(self._server_socket.accept()[0]) as connection:
+            message = _recv_message(connection)
+            response = {}
+            yield message, response
+            _send_message(connection, response)
+
+    def _handle_task_started_request(self, task_id, **kwargs):
+        self._task_started(self._tasks[task_id])
+
+    def _handle_task_succeeded_request(self, task_id, request, **kwargs):
+        task = self._remove_task(task_id)
+        try:
+            self._apply_tracked_changes(task, request)
+        except BaseException as e:
+            e.message += UPDATE_TRACKED_CHANGES_FAILED_STR
+            self._task_failed(task, exception=e)
+        else:
+            self._task_succeeded(task)
+
+    def _handle_task_failed_request(self, task_id, request, **kwargs):
+        task = self._remove_task(task_id)
+        try:
+            self._apply_tracked_changes(task, request)
+        except BaseException as e:
+            e.message += 'Task failed due to {0}.'.format(request['exception']) + \
+                         UPDATE_TRACKED_CHANGES_FAILED_STR
+            self._task_failed(task, exception=e)
+        else:
+            self._task_failed(task, exception=request['exception'])
+
+    def _handle_apply_tracked_changes_request(self, task_id, request, response):
+        task = self._tasks[task_id]
+        try:
+            self._apply_tracked_changes(task, request)
+        except BaseException as e:
+            response['exception'] = exceptions.wrap_if_needed(e)
+
+    @staticmethod
+    def _apply_tracked_changes(task, request):
+        instrumentation.apply_tracked_changes(
+            tracked_changes=request['tracked_changes'],
+            model=task.context.model)
+
+
+def _send_message(connection, message):
+    data = jsonpickle.dumps(message)
+    connection.send(struct.pack(_INT_FMT, len(data)))
+    connection.sendall(data)
+
+
+def _recv_message(connection):
+    message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE))
+    return jsonpickle.loads(_recv_bytes(connection, message_len))
+
+
+def _recv_bytes(connection, count):
+    result = io.BytesIO()
+    while True:
+        if not count:
+            return result.getvalue()
+        read = connection.recv(count)
+        if not read:
+            return result.getvalue()
+        result.write(read)
+        count -= len(read)
+
 
 class _Messenger(object):
 
@@ -261,17 +302,16 @@ class _Messenger(object):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
-            data = jsonpickle.dumps({
+            _send_message(sock, {
                 'type': type,
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
                 'tracked_changes': tracked_changes
             })
-            sock.send(struct.pack(_INT_FMT, len(data)))
-            sock.sendall(data)
-            # send message will block until the server side closes the connection socket
-            # because we want it to be synchronous
-            sock.recv(1)
+            response = _recv_message(sock)
+            response_exception = response.get('exception')
+            if response_exception:
+                raise response_exception
         finally:
             sock.close()
 
@@ -294,12 +334,17 @@ def _patch_session(ctx, messenger, instrument):
         messenger.apply_tracked_changes(instrument.tracked_changes)
         instrument.clear()
 
+    def patched_rollback():
+        # Rollback is performed on parent process when commit fails
+        pass
+
     # when autoflush is set to true (the default), refreshing an object will trigger
     # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
     # far on the session. this is not the desired behavior in the subprocess
     session.autoflush = False
 
     session.commit = patched_commit
+    session.rollback = patched_rollback
     session.refresh = patched_refresh
 
 
@@ -324,7 +369,6 @@ def _main():
     # This is required for the instrumentation work properly.
     # See docstring of `remove_mutable_association_listener` for further details
     storage_type.remove_mutable_association_listener()
-
     with instrumentation.track_changes() as instrument:
         try:
             ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 57fe9bd..8fb9d82 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -14,11 +14,16 @@
 # limitations under the License.
 
 import copy
+import json
 
+import sqlalchemy
 import sqlalchemy.event
 
+from . import exceptions
+
 from .modeling import model as _model
 
+_VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
     _model.Node.runtime_properties: dict
@@ -92,6 +97,11 @@ class _Instrumentation(object):
             mapi_name = instrumented_class.__modelname__
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
+            if hasattr(target, _VERSION_ID_COL):
+                # We want to keep track of the initial version id so it can be compared
+                # with the committed version id when the tracked changes are applied
+                tracked_attributes.setdefault(_VERSION_ID_COL,
+                                              _Value(_STUB, getattr(target, _VERSION_ID_COL)))
             for attribute_name, attribute_type in instrumented_attributes.items():
                 if attribute_name not in tracked_attributes:
                     initial = getattr(target, attribute_name)
@@ -143,7 +153,11 @@ class _Value(object):
         return self.initial == other.initial and self.current == other.current
 
     def __hash__(self):
-        return hash(self.initial) ^ hash(self.current)
+        return hash((self.initial, self.current))
+
+    @property
+    def dict(self):
+        return {'initial': self.initial, 'current': self.current}.copy()
 
 
 def apply_tracked_changes(tracked_changes, model):
@@ -153,14 +167,49 @@ def apply_tracked_changes(tracked_changes, model):
                             returned by calling ``track_changes()``
     :param model: The model storage used to actually apply the changes
     """
-    for mapi_name, tracked_instances in tracked_changes.items():
-        mapi = getattr(model, mapi_name)
-        for instance_id, tracked_attributes in tracked_instances.items():
-            instance = None
-            for attribute_name, value in tracked_attributes.items():
-                if value.initial != value.current:
-                    if not instance:
-                        instance = mapi.get(instance_id)
-                    setattr(instance, attribute_name, value.current)
-            if instance:
-                mapi.update(instance)
+    successfully_updated_changes = dict()
+    try:
+        for mapi_name, tracked_instances in tracked_changes.items():
+            successfully_updated_changes[mapi_name] = dict()
+            mapi = getattr(model, mapi_name)
+            for instance_id, tracked_attributes in tracked_instances.items():
+                successfully_updated_changes[mapi_name][instance_id] = dict()
+                instance = None
+                for attribute_name, value in tracked_attributes.items():
+                    if value.initial != value.current:
+                        if not instance:
+                            instance = mapi.get(instance_id)
+                        setattr(instance, attribute_name, value.current)
+                if instance:
+                    _validate_version_id(instance, mapi)
+                    mapi.update(instance)
+                    successfully_updated_changes[mapi_name][instance_id] = [
+                        v.dict for v in tracked_attributes.values()]
+    except BaseException:
+        for key, value in successfully_updated_changes.items():
+            if not value:
+                del successfully_updated_changes[key]
+        model.logger.error(
+            'Registering all the changes to the storage has failed. \n'
+            'The successful updates were: \n '
+            '{0}'.format(json.dumps(successfully_updated_changes, indent=4)))
+
+        raise
+
+
+def _validate_version_id(instance, mapi):
+    version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL)
+    # There are two version conflict code paths:
+    # 1. The instance committed state loaded already holds a newer version,
+    #    in this case, we manually raise the error
+    # 2. The UPDATE statement is executed with version validation and sqlalchemy
+    #    will raise a StateDataError if there is a version mismatch.
+    if version_id and getattr(instance, _VERSION_ID_COL) != version_id:
+        object_version_id = getattr(instance, _VERSION_ID_COL)
+        mapi._session.rollback()
+        raise exceptions.StorageError(
+            'Version conflict: committed and object {0} differ '
+            '[committed {0}={1}, object {0}={2}]'
+            .format(_VERSION_ID_COL,
+                    version_id,
+                    object_version_id))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/modeling/instance_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/instance_elements.py b/aria/storage/modeling/instance_elements.py
index 0666c8a..2b102f1 100644
--- a/aria/storage/modeling/instance_elements.py
+++ b/aria/storage/modeling/instance_elements.py
@@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin):
 
     # region many-to-one relationships
     @declared_attr
-    def service_instnce(cls):
+    def service_instance(cls):
         return cls.many_to_one_relationship('service_instance')
 
     # region many-to-many relationships
@@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin):
     * :code:`relationships`: List of :class:`Relationship`
     """
     __tablename__ = 'node'
+    version = Column(Integer, nullable=False)
+    __mapper_args__ = {'version_id_col': version}
 
     __private_fields__ = ['service_instance_fk',
                           'host_fk',
@@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin):
     runtime_properties = Column(aria_types.Dict)
     scaling_groups = Column(aria_types.List)
     state = Column(Text, nullable=False)
-    version = Column(Integer, default=1)
 
     @declared_attr
     def plugins(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index b80ac8e..2711aee 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -23,6 +23,7 @@ from sqlalchemy import (
     orm,
 )
 from sqlalchemy.exc import SQLAlchemyError
+from sqlalchemy.orm.exc import StaleDataError
 
 from aria.utils.collections import OrderedDict
 from . import (
@@ -162,6 +163,9 @@ class SQLAlchemyModelAPI(api.ModelAPI):
         """
         try:
             self._session.commit()
+        except StaleDataError as e:
+            self._session.rollback()
+            raise exceptions.StorageError('Version conflict: {0}'.format(str(e)))
         except (SQLAlchemyError, ValueError) as e:
             self._session.rollback()
             raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage_initializer.py
----------------------------------------------------------------------
diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py
index aea5ec8..175ec22 100644
--- a/aria/storage_initializer.py
+++ b/aria/storage_initializer.py
@@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model):
         service_instance=service_instance,
         name=node_model.id,
         runtime_properties={},
-        version=None,
         node_template=node,
         state='',
         scaling_groups=[]

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 047526a..301fc01 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment):
         name=DEPENDENCY_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={'ip': '1.1.1.1'},
-        version=None,
         node_template=dependency_node,
         state='',
         scaling_groups=[]
@@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment):
         name=DEPENDENT_NODE_INSTANCE_NAME,
         service_instance=deployment,
         runtime_properties={},
-        version=None,
         node_template=dependent_node,
         state='',
         scaling_groups=[],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644
index 0000000..ad3cb76
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.storage.exceptions import StorageError
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
+    _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
+    _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files):
+    _test(context, executor, lock_files, _test_task_failed, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value):
+    first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+    if not first:
+        raise RuntimeError('MESSAGE')
+
+
+def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files):
+    _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False)
+
+
+@operation
+def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
+    node = ctx.node
+    first = _concurrent_update(lock_files, node, key, first_value, second_value)
+    if not first:
+        try:
+            ctx.model.node.update(node)
+        except StorageError as e:
+            assert 'Version conflict' in str(e)
+            ctx.model.node.refresh(node)
+        else:
+            raise RuntimeError('Unexpected')
+
+
+def _test(context, executor, lock_files, func, expected_failure):
+    def _node(ctx):
+        return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+    op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+    key = 'key'
+    first_value = 'value1'
+    second_value = 'value2'
+    inputs = {
+        'lock_files': lock_files,
+        'key': key,
+        'first_value': first_value,
+        'second_value': second_value
+    }
+
+    node = _node(context)
+    node.interfaces = [mock.models.get_interface(
+        op_name,
+        operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, func.__name__))
+    )]
+    context.model.node.update(node)
+
+    @workflow
+    def mock_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs),
+            api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs)
+        )
+
+    signal = events.on_failure_task_signal
+    with events_collector(signal) as collected:
+        try:
+            execute_workflow(mock_workflow, context, executor)
+        except ExecutorException:
+            pass
+
+    props = _node(context).runtime_properties
+    assert props[key] == first_value
+
+    exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+    if expected_failure:
+        assert exceptions
+        exception = exceptions[-1]
+        assert isinstance(exception, StorageError)
+        assert 'Version conflict' in str(exception)
+    else:
+        assert not exceptions
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(str(tmpdir))
+    yield result
+    storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+    return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value):
+
+    locker1 = fasteners.InterProcessLock(lock_files[0])
+    locker2 = fasteners.InterProcessLock(lock_files[1])
+
+    first = locker1.acquire(blocking=False)
+
+    if first:
+        # Give chance for both processes to acquire locks
+        while locker2.acquire(blocking=False):
+            locker2.release()
+            time.sleep(0.01)
+    else:
+        locker2.acquire()
+
+    node.runtime_properties[key] = first_value if first else second_value
+
+    if first:
+        locker1.release()
+    else:
+        with locker1:
+            locker2.release()
+
+    return first

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 0e4740f..2f0245a 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -11,8 +11,9 @@
 # limitations under the License.
 
 testtools
+fasteners==0.13.0
 mock==1.0.1
 pylint==1.6.4
 pytest==3.0.2
 pytest-cov==2.3.1
-pytest-mock==1.2
\ No newline at end of file
+pytest-mock==1.2

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/storage/test_structures.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py
index 30f0064..666256e 100644
--- a/tests/storage/test_structures.py
+++ b/tests/storage/test_structures.py
@@ -125,7 +125,6 @@ def test_relationship_model_ordering(context):
         name='new_node_instance',
         runtime_properties={},
         service_instance=service_instance,
-        version=None,
         node_template=new_node_template,
         state='',
         scaling_groups=[]