You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/02/17 19:12:55 UTC
[3/3] incubator-ariatosca git commit: ARIA-79-concurrent-modifications
ARIA-79-concurrent-modifications
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/96581d9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/96581d9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/96581d9f
Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: 96581d9f9b00736d3c83ecda20d2e9f81073d4da
Parents: 787d7e7
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Mon Jan 30 16:49:00 2017 +0200
Committer: mxmrlv <mx...@gmail.com>
Committed: Fri Feb 17 21:12:36 2017 +0200
----------------------------------------------------------------------
aria/.pylintrc | 2 +-
aria/orchestrator/workflows/executor/process.py | 162 +++++++++++-------
aria/storage/instrumentation.py | 73 +++++++--
aria/storage/modeling/instance_elements.py | 5 +-
aria/storage/sql_mapi.py | 4 +
aria/storage_initializer.py | 1 -
tests/mock/models.py | 2 -
...process_executor_concurrent_modifications.py | 164 +++++++++++++++++++
tests/requirements.txt | 3 +-
tests/storage/test_structures.py | 1 -
10 files changed, 338 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/.pylintrc
----------------------------------------------------------------------
diff --git a/aria/.pylintrc b/aria/.pylintrc
index ee4d0ef..b7656a3 100644
--- a/aria/.pylintrc
+++ b/aria/.pylintrc
@@ -250,7 +250,7 @@ docstring-min-length=-1
[ELIF]
# Maximum number of nested blocks for function / method body
-max-nested-blocks=5
+max-nested-blocks=7
[SIMILARITIES]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 560ac43..75bbbce 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -53,6 +53,8 @@ _IS_WIN = os.name == 'nt'
_INT_FMT = 'I'
_INT_SIZE = struct.calcsize(_INT_FMT)
+UPDATE_TRACKED_CHANGES_FAILED_STR = \
+ 'Some changes failed writing to storage. For more info refer to the log.'
class ProcessExecutor(base.BaseExecutor):
@@ -74,6 +76,13 @@ class ProcessExecutor(base.BaseExecutor):
# Contains reference to all currently running tasks
self._tasks = {}
+ self._request_handlers = {
+ 'started': self._handle_task_started_request,
+ 'succeeded': self._handle_task_succeeded_request,
+ 'failed': self._handle_task_failed_request,
+ 'apply_tracked_changes': self._handle_apply_tracked_changes_request
+ }
+
# Server socket used to accept task status messages from subprocesses
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_socket.bind(('localhost', 0))
@@ -131,58 +140,6 @@ class ProcessExecutor(base.BaseExecutor):
def _remove_task(self, task_id):
return self._tasks.pop(task_id)
- def _listener(self):
- # Notify __init__ method this thread has actually started
- self._listener_started.put(True)
- while not self._stopped:
- try:
- # Accept messages written to the server socket
- with contextlib.closing(self._server_socket.accept()[0]) as connection:
- message = self._recv_message(connection)
- message_type = message['type']
- if message_type == 'closed':
- break
- task_id = message['task_id']
- if message_type == 'started':
- self._task_started(self._tasks[task_id])
- elif message_type == 'apply_tracked_changes':
- task = self._tasks[task_id]
- instrumentation.apply_tracked_changes(
- tracked_changes=message['tracked_changes'],
- model=task.context.model)
- elif message_type == 'succeeded':
- task = self._remove_task(task_id)
- instrumentation.apply_tracked_changes(
- tracked_changes=message['tracked_changes'],
- model=task.context.model)
- self._task_succeeded(task)
- elif message_type == 'failed':
- task = self._remove_task(task_id)
- instrumentation.apply_tracked_changes(
- tracked_changes=message['tracked_changes'],
- model=task.context.model)
- self._task_failed(task, exception=message['exception'])
- else:
- raise RuntimeError('Invalid state')
- except BaseException as e:
- self.logger.debug('Error in process executor listener: {0}'.format(e))
-
- def _recv_message(self, connection):
- message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
- return jsonpickle.loads(self._recv_bytes(connection, message_len))
-
- @staticmethod
- def _recv_bytes(connection, count):
- result = io.BytesIO()
- while True:
- if not count:
- return result.getvalue()
- read = connection.recv(count)
- if not read:
- return result.getvalue()
- result.write(read)
- count -= len(read)
-
def _check_closed(self):
if self._stopped:
raise RuntimeError('Executor closed')
@@ -231,6 +188,90 @@ class ProcessExecutor(base.BaseExecutor):
os.pathsep,
env.get('PYTHONPATH', ''))
+ def _listener(self):
+ # Notify __init__ method this thread has actually started
+ self._listener_started.put(True)
+ while not self._stopped:
+ try:
+ with self._accept_request() as (request, response):
+ request_type = request['type']
+ if request_type == 'closed':
+ break
+ request_handler = self._request_handlers.get(request_type)
+ if not request_handler:
+ raise RuntimeError('Invalid request type: {0}'.format(request_type))
+ request_handler(task_id=request['task_id'], request=request, response=response)
+ except BaseException as e:
+ self.logger.debug('Error in process executor listener: {0}'.format(e))
+
+ @contextlib.contextmanager
+ def _accept_request(self):
+ with contextlib.closing(self._server_socket.accept()[0]) as connection:
+ message = _recv_message(connection)
+ response = {}
+ yield message, response
+ _send_message(connection, response)
+
+ def _handle_task_started_request(self, task_id, **kwargs):
+ self._task_started(self._tasks[task_id])
+
+ def _handle_task_succeeded_request(self, task_id, request, **kwargs):
+ task = self._remove_task(task_id)
+ try:
+ self._apply_tracked_changes(task, request)
+ except BaseException as e:
+ e.message += UPDATE_TRACKED_CHANGES_FAILED_STR
+ self._task_failed(task, exception=e)
+ else:
+ self._task_succeeded(task)
+
+ def _handle_task_failed_request(self, task_id, request, **kwargs):
+ task = self._remove_task(task_id)
+ try:
+ self._apply_tracked_changes(task, request)
+ except BaseException as e:
+ e.message += 'Task failed due to {0}.'.format(request['exception']) + \
+ UPDATE_TRACKED_CHANGES_FAILED_STR
+ self._task_failed(task, exception=e)
+ else:
+ self._task_failed(task, exception=request['exception'])
+
+ def _handle_apply_tracked_changes_request(self, task_id, request, response):
+ task = self._tasks[task_id]
+ try:
+ self._apply_tracked_changes(task, request)
+ except BaseException as e:
+ response['exception'] = exceptions.wrap_if_needed(e)
+
+ @staticmethod
+ def _apply_tracked_changes(task, request):
+ instrumentation.apply_tracked_changes(
+ tracked_changes=request['tracked_changes'],
+ model=task.context.model)
+
+
+def _send_message(connection, message):
+ data = jsonpickle.dumps(message)
+ connection.send(struct.pack(_INT_FMT, len(data)))
+ connection.sendall(data)
+
+
+def _recv_message(connection):
+ message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE))
+ return jsonpickle.loads(_recv_bytes(connection, message_len))
+
+
+def _recv_bytes(connection, count):
+ result = io.BytesIO()
+ while True:
+ if not count:
+ return result.getvalue()
+ read = connection.recv(count)
+ if not read:
+ return result.getvalue()
+ result.write(read)
+ count -= len(read)
+
class _Messenger(object):
@@ -261,17 +302,16 @@ class _Messenger(object):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
try:
- data = jsonpickle.dumps({
+ _send_message(sock, {
'type': type,
'task_id': self.task_id,
'exception': exceptions.wrap_if_needed(exception),
'tracked_changes': tracked_changes
})
- sock.send(struct.pack(_INT_FMT, len(data)))
- sock.sendall(data)
- # send message will block until the server side closes the connection socket
- # because we want it to be synchronous
- sock.recv(1)
+ response = _recv_message(sock)
+ response_exception = response.get('exception')
+ if response_exception:
+ raise response_exception
finally:
sock.close()
@@ -294,12 +334,17 @@ def _patch_session(ctx, messenger, instrument):
messenger.apply_tracked_changes(instrument.tracked_changes)
instrument.clear()
+ def patched_rollback():
+ # Rollback is performed on parent process when commit fails
+ pass
+
# when autoflush is set to true (the default), refreshing an object will trigger
# an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
# far on the session. this is not the desired behavior in the subprocess
session.autoflush = False
session.commit = patched_commit
+ session.rollback = patched_rollback
session.refresh = patched_refresh
@@ -324,7 +369,6 @@ def _main():
# This is required for the instrumentation work properly.
# See docstring of `remove_mutable_association_listener` for further details
storage_type.remove_mutable_association_listener()
-
with instrumentation.track_changes() as instrument:
try:
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 57fe9bd..8fb9d82 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -14,11 +14,16 @@
# limitations under the License.
import copy
+import json
+import sqlalchemy
import sqlalchemy.event
+from . import exceptions
+
from .modeling import model as _model
+_VERSION_ID_COL = 'version'
_STUB = object()
_INSTRUMENTED = {
_model.Node.runtime_properties: dict
@@ -92,6 +97,11 @@ class _Instrumentation(object):
mapi_name = instrumented_class.__modelname__
tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
tracked_attributes = tracked_instances.setdefault(target.id, {})
+ if hasattr(target, _VERSION_ID_COL):
+ # We want to keep track of the initial version id so it can be compared
+ # with the committed version id when the tracked changes are applied
+ tracked_attributes.setdefault(_VERSION_ID_COL,
+ _Value(_STUB, getattr(target, _VERSION_ID_COL)))
for attribute_name, attribute_type in instrumented_attributes.items():
if attribute_name not in tracked_attributes:
initial = getattr(target, attribute_name)
@@ -143,7 +153,11 @@ class _Value(object):
return self.initial == other.initial and self.current == other.current
def __hash__(self):
- return hash(self.initial) ^ hash(self.current)
+ return hash((self.initial, self.current))
+
+ @property
+ def dict(self):
+ return {'initial': self.initial, 'current': self.current}.copy()
def apply_tracked_changes(tracked_changes, model):
@@ -153,14 +167,49 @@ def apply_tracked_changes(tracked_changes, model):
returned by calling ``track_changes()``
:param model: The model storage used to actually apply the changes
"""
- for mapi_name, tracked_instances in tracked_changes.items():
- mapi = getattr(model, mapi_name)
- for instance_id, tracked_attributes in tracked_instances.items():
- instance = None
- for attribute_name, value in tracked_attributes.items():
- if value.initial != value.current:
- if not instance:
- instance = mapi.get(instance_id)
- setattr(instance, attribute_name, value.current)
- if instance:
- mapi.update(instance)
+ successfully_updated_changes = dict()
+ try:
+ for mapi_name, tracked_instances in tracked_changes.items():
+ successfully_updated_changes[mapi_name] = dict()
+ mapi = getattr(model, mapi_name)
+ for instance_id, tracked_attributes in tracked_instances.items():
+ successfully_updated_changes[mapi_name][instance_id] = dict()
+ instance = None
+ for attribute_name, value in tracked_attributes.items():
+ if value.initial != value.current:
+ if not instance:
+ instance = mapi.get(instance_id)
+ setattr(instance, attribute_name, value.current)
+ if instance:
+ _validate_version_id(instance, mapi)
+ mapi.update(instance)
+ successfully_updated_changes[mapi_name][instance_id] = [
+ v.dict for v in tracked_attributes.values()]
+ except BaseException:
+ for key, value in successfully_updated_changes.items():
+ if not value:
+ del successfully_updated_changes[key]
+ model.logger.error(
+ 'Registering all the changes to the storage has failed. \n'
+ 'The successful updates were: \n '
+ '{0}'.format(json.dumps(successfully_updated_changes, indent=4)))
+
+ raise
+
+
+def _validate_version_id(instance, mapi):
+ version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL)
+ # There are two version conflict code paths:
+ # 1. The instance committed state loaded already holds a newer version,
+ # in this case, we manually raise the error
+ # 2. The UPDATE statement is executed with version validation and sqlalchemy
+ # will raise a StateDataError if there is a version mismatch.
+ if version_id and getattr(instance, _VERSION_ID_COL) != version_id:
+ object_version_id = getattr(instance, _VERSION_ID_COL)
+ mapi._session.rollback()
+ raise exceptions.StorageError(
+ 'Version conflict: committed and object {0} differ '
+ '[committed {0}={1}, object {0}={2}]'
+ .format(_VERSION_ID_COL,
+ version_id,
+ object_version_id))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/modeling/instance_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/instance_elements.py b/aria/storage/modeling/instance_elements.py
index 0666c8a..2b102f1 100644
--- a/aria/storage/modeling/instance_elements.py
+++ b/aria/storage/modeling/instance_elements.py
@@ -553,7 +553,7 @@ class PolicyBase(structure.ModelMixin):
# region many-to-one relationships
@declared_attr
- def service_instnce(cls):
+ def service_instance(cls):
return cls.many_to_one_relationship('service_instance')
# region many-to-many relationships
@@ -851,6 +851,8 @@ class NodeBase(structure.ModelMixin):
* :code:`relationships`: List of :class:`Relationship`
"""
__tablename__ = 'node'
+ version = Column(Integer, nullable=False)
+ __mapper_args__ = {'version_id_col': version}
__private_fields__ = ['service_instance_fk',
'host_fk',
@@ -878,7 +880,6 @@ class NodeBase(structure.ModelMixin):
runtime_properties = Column(aria_types.Dict)
scaling_groups = Column(aria_types.List)
state = Column(Text, nullable=False)
- version = Column(Integer, default=1)
@declared_attr
def plugins(cls):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index b80ac8e..2711aee 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -23,6 +23,7 @@ from sqlalchemy import (
orm,
)
from sqlalchemy.exc import SQLAlchemyError
+from sqlalchemy.orm.exc import StaleDataError
from aria.utils.collections import OrderedDict
from . import (
@@ -162,6 +163,9 @@ class SQLAlchemyModelAPI(api.ModelAPI):
"""
try:
self._session.commit()
+ except StaleDataError as e:
+ self._session.rollback()
+ raise exceptions.StorageError('Version conflict: {0}'.format(str(e)))
except (SQLAlchemyError, ValueError) as e:
self._session.rollback()
raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/aria/storage_initializer.py
----------------------------------------------------------------------
diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py
index aea5ec8..175ec22 100644
--- a/aria/storage_initializer.py
+++ b/aria/storage_initializer.py
@@ -95,7 +95,6 @@ def _create_node_instance(service_instance, node, node_model):
service_instance=service_instance,
name=node_model.id,
runtime_properties={},
- version=None,
node_template=node,
state='',
scaling_groups=[]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 047526a..301fc01 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -50,7 +50,6 @@ def get_dependency_node_instance(dependency_node, deployment):
name=DEPENDENCY_NODE_INSTANCE_NAME,
service_instance=deployment,
runtime_properties={'ip': '1.1.1.1'},
- version=None,
node_template=dependency_node,
state='',
scaling_groups=[]
@@ -96,7 +95,6 @@ def get_dependent_node_instance(dependent_node, deployment):
name=DEPENDENT_NODE_INSTANCE_NAME,
service_instance=deployment,
runtime_properties={},
- version=None,
node_template=dependent_node,
state='',
scaling_groups=[],
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
new file mode 100644
index 0000000..ad3cb76
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -0,0 +1,164 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import fasteners
+import pytest
+
+from aria.storage.exceptions import StorageError
+from aria.orchestrator import events
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests.orchestrator.context import execute as execute_workflow
+from tests.orchestrator.workflows.helpers import events_collector
+from tests import mock
+from tests import storage
+
+
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
+
+
+@operation
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
+ _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+
+
+def test_concurrent_modification_on_task_failed(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_task_failed, expected_failure=True)
+
+
+@operation
+def _test_task_failed(ctx, lock_files, key, first_value, second_value):
+ first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+ if not first:
+ raise RuntimeError('MESSAGE')
+
+
+def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files):
+ _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False)
+
+
+@operation
+def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
+ node = ctx.node
+ first = _concurrent_update(lock_files, node, key, first_value, second_value)
+ if not first:
+ try:
+ ctx.model.node.update(node)
+ except StorageError as e:
+ assert 'Version conflict' in str(e)
+ ctx.model.node.refresh(node)
+ else:
+ raise RuntimeError('Unexpected')
+
+
+def _test(context, executor, lock_files, func, expected_failure):
+ def _node(ctx):
+ return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+ op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ key = 'key'
+ first_value = 'value1'
+ second_value = 'value2'
+ inputs = {
+ 'lock_files': lock_files,
+ 'key': key,
+ 'first_value': first_value,
+ 'second_value': second_value
+ }
+
+ node = _node(context)
+ node.interfaces = [mock.models.get_interface(
+ op_name,
+ operation_kwargs=dict(implementation='{0}.{1}'.format(__name__, func.__name__))
+ )]
+ context.model.node.update(node)
+
+ @workflow
+ def mock_workflow(graph, **_):
+ graph.add_tasks(
+ api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs),
+ api.task.OperationTask.node(instance=node, name=op_name, inputs=inputs)
+ )
+
+ signal = events.on_failure_task_signal
+ with events_collector(signal) as collected:
+ try:
+ execute_workflow(mock_workflow, context, executor)
+ except ExecutorException:
+ pass
+
+ props = _node(context).runtime_properties
+ assert props[key] == first_value
+
+ exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
+ if expected_failure:
+ assert exceptions
+ exception = exceptions[-1]
+ assert isinstance(exception, StorageError)
+ assert 'Version conflict' in str(exception)
+ else:
+ assert not exceptions
+
+
+@pytest.fixture
+def executor():
+ result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+ yield result
+ result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+ result = mock.context.simple(str(tmpdir))
+ yield result
+ storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def lock_files(tmpdir):
+ return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
+
+
+def _concurrent_update(lock_files, node, key, first_value, second_value):
+
+ locker1 = fasteners.InterProcessLock(lock_files[0])
+ locker2 = fasteners.InterProcessLock(lock_files[1])
+
+ first = locker1.acquire(blocking=False)
+
+ if first:
+ # Give chance for both processes to acquire locks
+ while locker2.acquire(blocking=False):
+ locker2.release()
+ time.sleep(0.01)
+ else:
+ locker2.acquire()
+
+ node.runtime_properties[key] = first_value if first else second_value
+
+ if first:
+ locker1.release()
+ else:
+ with locker1:
+ locker2.release()
+
+ return first
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 0e4740f..2f0245a 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -11,8 +11,9 @@
# limitations under the License.
testtools
+fasteners==0.13.0
mock==1.0.1
pylint==1.6.4
pytest==3.0.2
pytest-cov==2.3.1
-pytest-mock==1.2
\ No newline at end of file
+pytest-mock==1.2
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96581d9f/tests/storage/test_structures.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py
index 30f0064..666256e 100644
--- a/tests/storage/test_structures.py
+++ b/tests/storage/test_structures.py
@@ -125,7 +125,6 @@ def test_relationship_model_ordering(context):
name='new_node_instance',
runtime_properties={},
service_instance=service_instance,
- version=None,
node_template=new_node_template,
state='',
scaling_groups=[]