You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/17 10:11:01 UTC
[2/2] incubator-ariatosca git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ebd1ef85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ebd1ef85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ebd1ef85
Branch: refs/heads/ARIA-258-Convert-runtime-properties-to-attributes
Commit: ebd1ef85cc7be8910d214bcd3fc70db1ae77743d
Parents: fdd57c4
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun May 14 22:38:39 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed May 17 13:04:38 2017 +0300
----------------------------------------------------------------------
aria/modeling/service_common.py | 9 +-
aria/modeling/types.py | 20 -
aria/orchestrator/context/common.py | 163 ++++++++
aria/orchestrator/context/operation.py | 10 +-
.../execution_plugin/ctx_proxy/server.py | 1 +
aria/orchestrator/workflows/core/engine.py | 1 -
aria/orchestrator/workflows/executor/process.py | 124 +-----
aria/storage/instrumentation.py | 282 -------------
tests/helpers.py | 10 +
tests/orchestrator/context/test_operation.py | 204 +++++++++-
.../execution_plugin/test_ctx_proxy_server.py | 3 +-
.../orchestrator/execution_plugin/test_local.py | 66 ++--
tests/orchestrator/execution_plugin/test_ssh.py | 36 +-
tests/orchestrator/workflows/core/test_task.py | 2 +-
.../orchestrator/workflows/executor/__init__.py | 4 +
...process_executor_concurrent_modifications.py | 67 ++--
.../executor/test_process_executor_extension.py | 6 +-
.../test_process_executor_tracked_changes.py | 56 ++-
tests/resources/scripts/test_ssh.sh | 30 +-
tests/storage/test_instrumentation.py | 396 -------------------
20 files changed, 539 insertions(+), 951 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/modeling/service_common.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py
index e9c96a4..ef19c8e 100644
--- a/aria/modeling/service_common.py
+++ b/aria/modeling/service_common.py
@@ -218,14 +218,13 @@ class ParameterBase(TemplateModelMixin, caching.HasCachedMethods):
:type description: basestring
"""
- from . import models
type_name = canonical_type_name(value)
if type_name is None:
type_name = full_type_name(value)
- return models.Parameter(name=name, # pylint: disable=unexpected-keyword-arg
- type_name=type_name,
- value=value,
- description=description)
+ return cls(name=name, # pylint: disable=unexpected-keyword-arg
+ type_name=type_name,
+ value=value,
+ description=description)
class TypeBase(InstanceModelMixin):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/modeling/types.py
----------------------------------------------------------------------
diff --git a/aria/modeling/types.py b/aria/modeling/types.py
index 7460f47..920a0c2 100644
--- a/aria/modeling/types.py
+++ b/aria/modeling/types.py
@@ -286,24 +286,4 @@ _LISTENER_ARGS = (mutable.mapper, 'mapper_configured', _mutable_association_list
def _register_mutable_association_listener():
event.listen(*_LISTENER_ARGS)
-
-def remove_mutable_association_listener():
- """
- Remove the event listener that associates ``Dict`` and ``List`` column types with
- ``MutableDict`` and ``MutableList``, respectively.
-
- This call must happen before any model instance is instantiated.
- This is because once it does, that would trigger the listener we are trying to remove.
- Once it is triggered, many other listeners will then be registered.
- At that point, it is too late.
-
- The reason this function exists is that the association listener, interferes with ARIA change
- tracking instrumentation, so a way to disable it is required.
-
- Note that the event listener this call removes is registered by default.
- """
- if event.contains(*_LISTENER_ARGS):
- event.remove(*_LISTENER_ARGS)
-
-
_register_mutable_association_listener()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 0854a27..83f7215 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -18,6 +18,7 @@ A common context for both workflow and operation
"""
import logging
+import collections
from contextlib import contextmanager
from functools import partial
@@ -195,3 +196,165 @@ class BaseContext(object):
variables.setdefault('ctx', self)
resource_template = jinja2.Template(resource_content)
return resource_template.render(variables)
+
+ def _teardown_db_resources(self):
+ self.model.log._session.close()
+ self.model.log._engine.dispose()
+
+class _Dict(collections.MutableMapping):
+ def __init__(self, actor, model, nested=None):
+ super(_Dict, self).__init__()
+ self._actor = actor
+ self._attributes = self._actor.attributes
+ self._model = model
+ self._attr_cls = self._model.parameter.model_cls
+ self._nested = nested or []
+
+ def __delitem__(self, key):
+ del self._nested_value[key]
+
+ def __contains__(self, item):
+ for key in self.keys():
+ if item == key:
+ return True
+ return False
+
+ def __len__(self):
+ return len(self._nested_value)
+
+ def __nonzero__(self):
+ return bool(self._nested_value)
+
+ def __getitem__(self, item):
+ if self._nested:
+ value = self._nested_value[item]
+ else:
+ value = self._attributes[item].value
+ if isinstance(value, dict):
+ return _Dict(self._actor, self._model, nested=self._nested + [item])
+ elif isinstance(value, self._attr_cls):
+ return value.value
+ return value
+
+ def __setitem__(self, key, value):
+ if self._nested or key in self._attributes:
+ attribute = self._update_attr(key, value)
+ self._model.parameter.update(attribute)
+ else:
+ attr = self._attr_cls.wrap(key, value)
+ self._attributes[key] = attr
+ self._model.parameter.put(attr)
+
+ @property
+ def _nested_value(self):
+ current = self._attributes
+ for k in self._nested:
+ current = current[k]
+ return current.value if isinstance(current, self._attr_cls) else current
+
+ def _update_attr(self, key, value):
+ current = self._attributes
+
+ # If this is nested, lets extract the Parameter itself
+ if self._nested:
+ attribute = current = current[self._nested[0]]
+ for k in self._nested[1:]:
+ current = current[k]
+ if isinstance(current, self._attr_cls):
+ current.value[key] = value
+ else:
+ current[key] = value
+ elif isinstance(current[key], self._attr_cls):
+ attribute = current[key]
+ attribute.value = value
+ else:
+ raise BaseException()
+
+ # Since this a user defined parameter, this doesn't track changes. So we override the entire
+ # thing.
+ if isinstance(attribute.value, dict):
+ value = attribute.value.copy()
+ attribute.value.clear()
+ attribute.value = value
+ return attribute
+
+ def _unwrap(self, attr):
+ return attr.unwrap() if isinstance(attr, self._attr_cls) else attr
+
+ def keys(self):
+ dict_ = (self._nested_value.value
+ if isinstance(self._nested_value, self._attr_cls)
+ else self._nested_value)
+ for key in dict_.keys():
+ yield key
+
+ def values(self):
+ for val in self._nested_value.values():
+ if isinstance(val, self._attr_cls):
+ yield val.value
+ else:
+ yield val
+
+ def items(self):
+ for key in self._nested_value:
+ val = self._nested_value[key]
+ if isinstance(val, self._attr_cls):
+ yield key, val.value
+ else:
+ yield key, val
+
+ def __dict__(self):
+ return dict(item for item in self.items())
+
+ def __iter__(self):
+ for key in self._nested_value.keys():
+ yield key
+
+ def __copy__(self):
+ return dict((k, v) for k, v in self.items())
+
+ def __deepcopy__(self, *args, **kwargs):
+ return self.__copy__()
+
+ def copy(self):
+ return self.__copy__()
+
+ def clear(self):
+ self._nested_value.clear()
+
+ def update(self, dict_=None, **kwargs):
+ if dict_:
+ for key, value in dict_.items():
+ self[key] = value
+
+ for key, value in kwargs.items():
+ self[key] = value
+
+
+class DecorateAttributes(dict):
+
+ def __init__(self, func):
+ super(DecorateAttributes, self).__init__()
+ self._func = func
+ self._attributes = None
+ self._actor = None
+
+ @property
+ def attributes(self):
+ return self._attributes
+
+ @property
+ def actor(self):
+ return self._actor
+
+ def __getattr__(self, item):
+ try:
+ return getattr(self._actor, item)
+ except AttributeError:
+ return super(DecorateAttributes, self).__getattribute__(item)
+
+ def __call__(self, *args, **kwargs):
+ func_self = args[0]
+ self._actor = self._func(*args, **kwargs)
+ self._attributes = _Dict(self._actor, func_self.model)
+ return self
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 68a02aa..f4e8813 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -21,10 +21,10 @@ import threading
import aria
from aria.utils import file
-from .common import BaseContext
+from . import common
-class BaseOperationContext(BaseContext):
+class BaseOperationContext(common.BaseContext):
"""
Context object used during operation creation and execution
"""
@@ -105,6 +105,7 @@ class NodeOperationContext(BaseOperationContext):
"""
@property
+ @common.DecorateAttributes
def node_template(self):
"""
the node of the current operation
@@ -113,6 +114,7 @@ class NodeOperationContext(BaseOperationContext):
return self.node.node_template
@property
+ @common.DecorateAttributes
def node(self):
"""
The node instance of the current operation
@@ -127,6 +129,7 @@ class RelationshipOperationContext(BaseOperationContext):
"""
@property
+ @common.DecorateAttributes
def source_node_template(self):
"""
The source node
@@ -135,6 +138,7 @@ class RelationshipOperationContext(BaseOperationContext):
return self.source_node.node_template
@property
+ @common.DecorateAttributes
def source_node(self):
"""
The source node instance
@@ -143,6 +147,7 @@ class RelationshipOperationContext(BaseOperationContext):
return self.relationship.source_node
@property
+ @common.DecorateAttributes
def target_node_template(self):
"""
The target node
@@ -151,6 +156,7 @@ class RelationshipOperationContext(BaseOperationContext):
return self.target_node.node_template
@property
+ @common.DecorateAttributes
def target_node(self):
"""
The target node instance
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 52a5312..954f85e 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -114,6 +114,7 @@ class CtxProxy(object):
'payload': payload
}, cls=modeling.utils.ModelJSONEncoder)
except Exception as e:
+ self.ctx.model.log._session.rollback()
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
payload = {
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 561265c..3a96804 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,7 +69,6 @@ class Engine(logger.LoggerMixin):
else:
events.on_success_workflow_signal.send(self._workflow_context)
except BaseException as e:
-
events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
raise
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 824c4e1..e83584b 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -43,14 +43,12 @@ import jsonpickle
import aria
from aria.orchestrator.workflows.executor import base
-from aria.storage import instrumentation
from aria.extension import process_executor
from aria.utils import (
imports,
exceptions,
process as process_utils
)
-from aria.modeling import types as modeling_types
_INT_FMT = 'I'
@@ -82,7 +80,6 @@ class ProcessExecutor(base.BaseExecutor):
'started': self._handle_task_started_request,
'succeeded': self._handle_task_succeeded_request,
'failed': self._handle_task_failed_request,
- 'apply_tracked_changes': self._handle_apply_tracked_changes_request
}
# Server socket used to accept task status messages from subprocesses
@@ -196,41 +193,13 @@ class ProcessExecutor(base.BaseExecutor):
def _handle_task_started_request(self, task_id, **kwargs):
self._task_started(self._tasks[task_id])
- def _handle_task_succeeded_request(self, task_id, request, **kwargs):
+ def _handle_task_succeeded_request(self, task_id, **kwargs):
task = self._remove_task(task_id)
- try:
- self._apply_tracked_changes(task, request)
- except BaseException as e:
- e.message += UPDATE_TRACKED_CHANGES_FAILED_STR
- self._task_failed(task, exception=e)
- else:
- self._task_succeeded(task)
+ self._task_succeeded(task)
def _handle_task_failed_request(self, task_id, request, **kwargs):
task = self._remove_task(task_id)
- try:
- self._apply_tracked_changes(task, request)
- except BaseException as e:
- e.message += 'Task failed due to {0}.'.format(request['exception']) + \
- UPDATE_TRACKED_CHANGES_FAILED_STR
- self._task_failed(
- task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info()))
- else:
- self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
-
- def _handle_apply_tracked_changes_request(self, task_id, request, response):
- task = self._tasks[task_id]
- try:
- self._apply_tracked_changes(task, request)
- except BaseException as e:
- response['exception'] = exceptions.wrap_if_needed(e)
-
- @staticmethod
- def _apply_tracked_changes(task, request):
- instrumentation.apply_tracked_changes(
- tracked_changes=request['tracked_changes'],
- new_instances=request['new_instances'],
- model=task.context.model)
+ self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
def _send_message(connection, message):
@@ -278,28 +247,19 @@ class _Messenger(object):
"""Task started message"""
self._send_message(type='started')
- def succeeded(self, tracked_changes, new_instances):
+ def succeeded(self):
"""Task succeeded message"""
- self._send_message(
- type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
+ self._send_message(type='succeeded')
- def failed(self, tracked_changes, new_instances, exception):
+ def failed(self, exception):
"""Task failed message"""
- self._send_message(type='failed',
- tracked_changes=tracked_changes,
- new_instances=new_instances,
- exception=exception)
-
- def apply_tracked_changes(self, tracked_changes, new_instances):
- self._send_message(type='apply_tracked_changes',
- tracked_changes=tracked_changes,
- new_instances=new_instances)
+ self._send_message(type='failed', exception=exception)
def closed(self):
"""Executor closed message"""
self._send_message(type='closed')
- def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
+ def _send_message(self, type, exception=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
try:
@@ -308,8 +268,6 @@ class _Messenger(object):
'task_id': self.task_id,
'exception': exceptions.wrap_if_needed(exception),
'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
- 'tracked_changes': tracked_changes or {},
- 'new_instances': new_instances or {}
})
response = _recv_message(sock)
response_exception = response.get('exception')
@@ -319,39 +277,6 @@ class _Messenger(object):
sock.close()
-def _patch_ctx(ctx, messenger, instrument):
- # model will be None only in tests that test the executor component directly
- if not ctx.model:
- return
-
- # We arbitrarily select the ``node`` mapi to extract the session from it.
- # could have been any other mapi just as well
- session = ctx.model.node._session
- original_refresh = session.refresh
-
- def patched_refresh(target):
- instrument.clear(target)
- original_refresh(target)
-
- def patched_commit():
- messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
- instrument.expunge_session()
- instrument.clear()
-
- def patched_rollback():
- # Rollback is performed on parent process when commit fails
- instrument.expunge_session()
-
- # when autoflush is set to true (the default), refreshing an object will trigger
- # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
- # far on the session. this is not the desired behavior in the subprocess
- session.autoflush = False
-
- session.commit = patched_commit
- session.rollback = patched_rollback
- session.refresh = patched_refresh
-
-
def _main():
arguments_json_path = sys.argv[1]
with open(arguments_json_path) as f:
@@ -369,32 +294,23 @@ def _main():
operation_inputs = arguments['operation_inputs']
context_dict = arguments['context']
- # This is required for the instrumentation work properly.
- # See docstring of `remove_mutable_association_listener` for further details
- modeling_types.remove_mutable_association_listener()
try:
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
except BaseException as e:
- messenger.failed(exception=e, tracked_changes=None, new_instances=None)
+ messenger.failed(e)
return
- with instrumentation.track_changes(ctx.model) as instrument:
- try:
- messenger.started()
- _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
- task_func = imports.load_attribute(implementation)
- aria.install_aria_extensions()
- for decorate in process_executor.decorate():
- task_func = decorate(task_func)
- task_func(ctx=ctx, **operation_inputs)
- messenger.succeeded(tracked_changes=instrument.tracked_changes,
- new_instances=instrument.new_instances)
- except BaseException as e:
- messenger.failed(exception=e,
- tracked_changes=instrument.tracked_changes,
- new_instances=instrument.new_instances)
- finally:
- instrument.expunge_session()
+ try:
+ messenger.started()
+ task_func = imports.load_attribute(implementation)
+ aria.install_aria_extensions()
+ for decorate in process_executor.decorate():
+ task_func = decorate(task_func)
+ task_func(ctx=ctx, **operation_inputs)
+ messenger.succeeded()
+ except BaseException as e:
+ ctx._teardown_db_resources()
+ messenger.failed(e)
if __name__ == '__main__':
_main()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
deleted file mode 100644
index 390f933..0000000
--- a/aria/storage/instrumentation.py
+++ /dev/null
@@ -1,282 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import json
-import os
-
-import sqlalchemy.event
-
-from ..modeling import models as _models
-from ..storage.exceptions import StorageError
-
-
-_VERSION_ID_COL = 'version'
-_STUB = object()
-_INSTRUMENTED = {
- 'modified': {
- _models.Node.runtime_properties: dict,
- _models.Node.state: str,
- _models.Task.status: str,
- },
- 'new': (_models.Log, )
-
-}
-
-_NEW_INSTANCE = 'NEW_INSTANCE'
-
-
-def track_changes(model=None, instrumented=None):
- """Track changes in the specified model columns
-
- This call will register event listeners using sqlalchemy's event mechanism. The listeners
- instrument all returned objects such that the attributes specified in ``instrumented``, will
- be replaced with a value that is stored in the returned instrumentation context
- ``tracked_changes`` property.
-
- Why should this be implemented when sqlalchemy already does a fantastic job at tracking changes
- you ask? Well, when sqlalchemy is used with sqlite, due to how sqlite works, only one process
- can hold a write lock to the database. This does not work well when ARIA runs tasks in
- subprocesses (by the process executor) and these tasks wish to change some state as well. These
- tasks certainly deserve a chance to do so!
-
- To enable this, the subprocess calls ``track_changes()`` before any state changes are made.
- At the end of the subprocess execution, it should return the ``tracked_changes`` attribute of
- the instrumentation context returned from this call, to the parent process. The parent process
- will then call ``apply_tracked_changes()`` that resides in this module as well.
- At that point, the changes will actually be written back to the database.
-
- :param model: the model storage. it should hold a mapi for each model. the session of each mapi
- is needed to setup events
- :param instrumented: A dict from model columns to their python native type
- :return: The instrumentation context
- """
- return _Instrumentation(model, instrumented or _INSTRUMENTED)
-
-
-class _Instrumentation(object):
-
- def __init__(self, model, instrumented):
- self.tracked_changes = {}
- self.new_instances = {}
- self.listeners = []
- self._instances_to_expunge = []
- self._model = model
- self._track_changes(instrumented)
-
- @property
- def _new_instance_id(self):
- return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE,
- index=len(self._instances_to_expunge))
-
- def expunge_session(self):
- for new_instance in self._instances_to_expunge:
- self._get_session_from_model(new_instance.__tablename__).expunge(new_instance)
-
- def _get_session_from_model(self, tablename):
- mapi = getattr(self._model, tablename, None)
- if mapi:
- return mapi._session
- raise StorageError("Could not retrieve session for {0}".format(tablename))
-
- def _track_changes(self, instrumented):
- instrumented_attribute_classes = {}
- # Track any newly-set attributes.
- for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
- self._register_set_attribute_listener(
- instrumented_attribute=instrumented_attribute,
- attribute_type=attribute_type)
- instrumented_class = instrumented_attribute.parent.entity
- instrumented_class_attributes = instrumented_attribute_classes.setdefault(
- instrumented_class, {})
- instrumented_class_attributes[instrumented_attribute.key] = attribute_type
-
- # Track any global instance update such as 'refresh' or 'load'
- for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
- self._register_instance_listeners(instrumented_class=instrumented_class,
- instrumented_attributes=instrumented_attributes)
-
- # Track any newly created instances.
- for instrumented_class in instrumented.get('new', {}):
- self._register_new_instance_listener(instrumented_class)
-
- def _register_new_instance_listener(self, instrumented_class):
- if self._model is None:
- raise StorageError("In order to keep track of new instances, a ctx is needed")
-
- def listener(_, instance):
- if not isinstance(instance, instrumented_class):
- return
- self._instances_to_expunge.append(instance)
- tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
- tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
- instance_as_dict = instance.to_dict()
- instance_as_dict.update((k, getattr(instance, k))
- for k in getattr(instance, '__private_fields__', []))
- tracked_attributes.update(instance_as_dict)
- session = self._get_session_from_model(instrumented_class.__tablename__)
- listener_args = (session, 'after_attach', listener)
- sqlalchemy.event.listen(*listener_args)
- self.listeners.append(listener_args)
-
- def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
- def listener(target, value, *_):
- mapi_name = target.__modelname__
- tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
- tracked_attributes = tracked_instances.setdefault(target.id, {})
- if value is None:
- current = None
- else:
- current = copy.deepcopy(attribute_type(value))
- tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current)
- return current
- listener_args = (instrumented_attribute, 'set', listener)
- sqlalchemy.event.listen(*listener_args, retval=True)
- self.listeners.append(listener_args)
-
- def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
- def listener(target, *_):
- mapi_name = instrumented_class.__modelname__
- tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
- tracked_attributes = tracked_instances.setdefault(target.id, {})
- if hasattr(target, _VERSION_ID_COL):
- # We want to keep track of the initial version id so it can be compared
- # with the committed version id when the tracked changes are applied
- tracked_attributes.setdefault(_VERSION_ID_COL,
- _Value(_STUB, getattr(target, _VERSION_ID_COL)))
- for attribute_name, attribute_type in instrumented_attributes.items():
- if attribute_name not in tracked_attributes:
- initial = getattr(target, attribute_name)
- if initial is None:
- current = None
- else:
- current = copy.deepcopy(attribute_type(initial))
- tracked_attributes[attribute_name] = _Value(initial, current)
- target.__dict__[attribute_name] = tracked_attributes[attribute_name].current
- for listener_args in ((instrumented_class, 'load', listener),
- (instrumented_class, 'refresh', listener),
- (instrumented_class, 'refresh_flush', listener)):
- sqlalchemy.event.listen(*listener_args)
- self.listeners.append(listener_args)
-
- def clear(self, target=None):
- if target:
- mapi_name = target.__modelname__
- tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
- tracked_instances.pop(target.id, None)
- else:
- self.tracked_changes.clear()
-
- self.new_instances.clear()
- self._instances_to_expunge = []
-
- def restore(self):
- """Remove all listeners registered by this instrumentation"""
- for listener_args in self.listeners:
- if sqlalchemy.event.contains(*listener_args):
- sqlalchemy.event.remove(*listener_args)
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.restore()
-
-
-class _Value(object):
- # You may wonder why is this a full blown class and not a named tuple. The reason is that
- # jsonpickle that is used to serialize the tracked_changes, does not handle named tuples very
- # well. At the very least, I could not get it to behave.
-
- def __init__(self, initial, current):
- self.initial = initial
- self.current = current
-
- def __eq__(self, other):
- if not isinstance(other, _Value):
- return False
- return self.initial == other.initial and self.current == other.current
-
- def __hash__(self):
- return hash((self.initial, self.current))
-
- @property
- def dict(self):
- return {'initial': self.initial, 'current': self.current}.copy()
-
-
-def apply_tracked_changes(tracked_changes, new_instances, model):
- """Write tracked changes back to the database using provided model storage
-
- :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
- returned by calling ``track_changes()``
- :param model: The model storage used to actually apply the changes
- """
- successfully_updated_changes = dict()
- try:
- # handle instance updates
- for mapi_name, tracked_instances in tracked_changes.items():
- successfully_updated_changes[mapi_name] = dict()
- mapi = getattr(model, mapi_name)
- for instance_id, tracked_attributes in tracked_instances.items():
- successfully_updated_changes[mapi_name][instance_id] = dict()
- instance = None
- for attribute_name, value in tracked_attributes.items():
- if value.initial != value.current:
- instance = instance or mapi.get(instance_id)
- setattr(instance, attribute_name, value.current)
- if instance:
- _validate_version_id(instance, mapi)
- mapi.update(instance)
- successfully_updated_changes[mapi_name][instance_id] = [
- v.dict for v in tracked_attributes.values()]
-
- # Handle new instances
- for mapi_name, new_instance in new_instances.items():
- successfully_updated_changes[mapi_name] = dict()
- mapi = getattr(model, mapi_name)
- for new_instance_kwargs in new_instance.values():
- instance = mapi.model_cls(**new_instance_kwargs)
- mapi.put(instance)
- successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
- except BaseException:
- for key, value in successfully_updated_changes.items():
- if not value:
- del successfully_updated_changes[key]
- # TODO: if the successful has _STUB, the logging fails because it can't serialize the object
- model.logger.error(
- 'Registering all the changes to the storage has failed. {0}'
- 'The successful updates were: {0} '
- '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
-
- raise
-
-
-def _validate_version_id(instance, mapi):
- version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL)
- # There are two version conflict code paths:
- # 1. The instance committed state loaded already holds a newer version,
- # in this case, we manually raise the error
- # 2. The UPDATE statement is executed with version validation and sqlalchemy
- # will raise a StateDataError if there is a version mismatch.
- if version_id and getattr(instance, _VERSION_ID_COL) != version_id:
- object_version_id = getattr(instance, _VERSION_ID_COL)
- mapi._session.rollback()
- raise StorageError(
- 'Version conflict: committed and object {0} differ '
- '[committed {0}={1}, object {0}={2}]'
- .format(_VERSION_ID_COL,
- version_id,
- object_version_id))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/helpers.py
----------------------------------------------------------------------
diff --git a/tests/helpers.py b/tests/helpers.py
index 3c3efc9..4c3194b 100644
--- a/tests/helpers.py
+++ b/tests/helpers.py
@@ -47,6 +47,9 @@ class FilesystemDataHolder(object):
with open(self._path, 'w') as f:
return json.dump(value, f)
+ def __contains__(self, item):
+ return item in self._load()
+
def __setitem__(self, key, value):
dict_ = self._load()
dict_[key] = value
@@ -67,6 +70,13 @@ class FilesystemDataHolder(object):
self._dump(dict_)
return return_value
+ def update(self, dict_=None, **kwargs):
+ current_dict = self._load()
+ if dict_:
+ current_dict.update(dict_)
+ current_dict.update(**kwargs)
+ self._dump(current_dict)
+
@property
def path(self):
return self._path
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index cdeb5fa..5ce0b22 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -25,7 +25,9 @@ from aria import (
operation,
)
from aria.orchestrator import context
+from aria.orchestrator.context import common
from aria.orchestrator.workflows import api
+from aria.modeling.models import Parameter
import tests
from tests import (
@@ -263,7 +265,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
@pytest.fixture(params=[
- (thread.ThreadExecutor, {}),
+ # (thread.ThreadExecutor, {}),
(process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
])
def executor(request):
@@ -343,6 +345,68 @@ def test_relationship_operation_logging(ctx, executor):
_assert_loggins(ctx, inputs)
+def test_attribute_consumption(ctx, executor, dataholder):
+ # region Updating node operation
+ node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+ source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+ inputs = {'attributes_dict': {'key': 'value'}}
+ interface = mock.models.create_interface(
+ source_node.service,
+ node_int_name,
+ node_op_name,
+ operation_kwargs=dict(
+ implementation=op_path(attribute_altering_operation, module_path=__name__),
+ inputs=inputs)
+ )
+ source_node.interfaces[interface.name] = interface
+ ctx.model.node.update(source_node)
+ # endregion
+
+ # region updating relationship operation
+ rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]
+
+ relationship = ctx.model.relationship.list()[0]
+ interface = mock.models.create_interface(
+ relationship.source_node.service,
+ rel_int_name,
+ rel_op_name,
+ operation_kwargs=dict(
+ implementation=op_path(attribute_consuming_operation, module_path=__name__),
+ inputs={'holder_path': dataholder.path}
+ )
+ )
+ relationship.interfaces[interface.name] = interface
+ ctx.model.relationship.update(relationship)
+ # endregion
+
+ @workflow
+ def basic_workflow(graph, **_):
+ graph.sequence(
+ api.task.OperationTask(
+ source_node,
+ interface_name=node_int_name,
+ operation_name=node_op_name,
+ inputs=inputs
+ ),
+ api.task.OperationTask(
+ relationship,
+ interface_name=rel_int_name,
+ operation_name=rel_op_name,
+ )
+ )
+
+ execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+ target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+ assert len(source_node.attributes) == len(target_node.attributes) == 1
+ assert source_node.attributes['key'] != target_node.attributes['key']
+ assert source_node.attributes['key'].value == \
+ target_node.attributes['key'].value == \
+ dataholder['key']
+
+
def _assert_loggins(ctx, inputs):
# The logs should contain the following: Workflow Start, Operation Start, custom operation
@@ -377,10 +441,10 @@ def _assert_loggins(ctx, inputs):
@operation
def logged_operation(ctx, **_):
- ctx.logger.info(ctx.task.inputs['op_start'])
+ ctx.logger.info(ctx.task.inputs['op_start'].value)
# enables to check the relation between the created_at field properly
time.sleep(1)
- ctx.logger.debug(ctx.task.inputs['op_end'])
+ ctx.logger.debug(ctx.task.inputs['op_end'].value)
@operation
@@ -422,3 +486,137 @@ def get_node_id(ctx, holder_path, **_):
def _test_plugin_workdir(ctx, filename, content):
with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
f.write(content)
+
+
+@operation
+def attribute_altering_operation(ctx, attributes_dict, **_):
+ ctx.node.attributes.update(attributes_dict)
+
+
+@operation
+def attribute_consuming_operation(ctx, holder_path, **_):
+ holder = helpers.FilesystemDataHolder(holder_path)
+ ctx.target_node.attributes.update(ctx.source_node.attributes)
+ holder.update(**ctx.source_node.attributes)
+
+
+class MockActor(object):
+ def __init__(self):
+ self.attributes = {}
+
+
+class MockModel(object):
+
+ def __init__(self):
+ self.parameter = type('MockModel', (object, ), {'model_cls': Parameter,
+ 'put': lambda *args, **kwargs: None,
+ 'update': lambda *args, **kwargs: None})()
+
+
+class TestDict(object):
+
+ @pytest.fixture
+ def actor(self):
+ return MockActor()
+
+ @pytest.fixture
+ def model(self):
+ return MockModel()
+
+ def test_keys(self, model, actor):
+ dict_ = common._Dict(actor, model)
+ actor.attributes.update(
+ {
+ 'key1': Parameter.wrap('key1', 'value1'),
+ 'key2': Parameter.wrap('key1', 'value2')
+ }
+ )
+ assert sorted(dict_.keys()) == sorted(['key1', 'key2'])
+
+ def test_values(self, model, actor):
+ dict_ = common._Dict(actor, model)
+ actor.attributes.update({
+ 'key1': Parameter.wrap('key1', 'value1'),
+ 'key2': Parameter.wrap('key1', 'value2')
+ })
+ assert sorted(dict_.values()) == sorted(['value1', 'value2'])
+
+ def test_items(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ actor.attributes.update({
+ 'key1': Parameter.wrap('key1', 'value1'),
+ 'key2': Parameter.wrap('key1', 'value2')
+ })
+ assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+ def test_iter(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ actor.attributes.update({
+ 'key1': Parameter.wrap('key1', 'value1'),
+ 'key2': Parameter.wrap('key1', 'value2')
+ })
+ assert sorted(list(dict_)) == sorted(['key1', 'key2'])
+
+ def test_bool(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ assert not dict_
+ actor.attributes.update({
+ 'key1': Parameter.wrap('key1', 'value1'),
+ 'key2': Parameter.wrap('key1', 'value2')
+ })
+ assert dict_
+
+ def test_set_item(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ dict_['key1'] = Parameter.wrap('key1', 'value1')
+ assert 'key1' in dict_
+ assert isinstance(dict_._attributes['key1'], Parameter)
+ assert dict_['key1'] == 'value1'
+
+ dict_['key1'] = {}
+ dict_['key1']['inner_key'] = 'value2'
+
+ assert isinstance(dict_._attributes['key1'], Parameter)
+ assert len(dict_) == 1
+ assert 'inner_key' in dict_['key1']
+ assert isinstance(dict_['key1'], common._Dict)
+ assert dict_['key1']['inner_key'] == 'value2'
+
+ def test_get_item(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ dict_['key1'] = Parameter.wrap('key1', 'value1')
+
+ assert isinstance(dict_._attributes['key1'], Parameter)
+
+ def test_update(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ dict_['key1'] = 'value1'
+
+ new_dict = {'key2': 'value2'}
+ dict_.update(new_dict)
+ assert len(dict_) == 2
+ assert dict_['key2'] == 'value2'
+ assert isinstance(dict_._attributes['key2'], Parameter)
+
+ new_dict = {}
+ new_dict.update(dict_)
+ assert new_dict['key1'] == dict_['key1']
+
+ def test_copy(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ dict_['key1'] = 'value1'
+
+ new_dict = dict_.copy()
+ assert new_dict is not dict_
+ assert new_dict == dict_
+
+ dict_['key1'] = 'value2'
+ assert new_dict['key1'] == 'value1'
+ assert dict_['key1'] == 'value2'
+
+ def test_clear(self, actor, model):
+ dict_ = common._Dict(actor, model)
+ dict_['key1'] = 'value1'
+ dict_.clear()
+
+ assert len(dict_) == 0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
index 98ceff9..a41f9f0 100644
--- a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
+++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
@@ -136,7 +136,7 @@ class TestCtxProxy(object):
kwargs=kwargs)
@pytest.fixture
- def ctx(self):
+ def ctx(self, mocker):
class MockCtx(object):
pass
ctx = MockCtx()
@@ -160,6 +160,7 @@ class TestCtxProxy(object):
ctx.stub_args = self.stub_args
ctx.stub_attr = self.StubAttribute()
ctx.node = self.NodeAttribute(properties)
+ ctx.model = mocker.MagicMock()
return ctx
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 09d0499..d9115e1 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -43,26 +43,26 @@ class TestLocalRunScript(object):
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties map.key value
+ ctx node attributes map.key value
''',
windows_script='''
- ctx node runtime-properties map.key value
+ ctx node attributes map.key value
''')
props = self._run(
executor, workflow_context,
script_path=script_path)
- assert props['map']['key'] == 'value'
+ assert props['map'].value['key'] == 'value'
def test_process_env(self, executor, workflow_context, tmpdir):
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties map.key1 $key1
- ctx node runtime-properties map.key2 $key2
+ ctx node attributes map.key1 $key1
+ ctx node attributes map.key2 $key2
''',
windows_script='''
- ctx node runtime-properties map.key1 %key1%
- ctx node runtime-properties map.key2 %key2%
+ ctx node attributes map.key1 %key1%
+ ctx node attributes map.key2 %key2%
''')
props = self._run(
executor, workflow_context,
@@ -73,7 +73,7 @@ class TestLocalRunScript(object):
'key2': 'value2'
}
})
- p_map = props['map']
+ p_map = props['map'].value
assert p_map['key1'] == 'value1'
assert p_map['key2'] == 'value2'
@@ -81,10 +81,10 @@ class TestLocalRunScript(object):
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties map.cwd $PWD
+ ctx node attributes map.cwd $PWD
''',
windows_script='''
- ctx node runtime-properties map.cwd %CD%
+ ctx node attributes map.cwd %CD%
''')
tmpdir = str(tmpdir)
props = self._run(
@@ -93,11 +93,11 @@ class TestLocalRunScript(object):
process={
'cwd': tmpdir
})
- p_map = props['map']
+ p_map = props['map'].value
assert p_map['cwd'] == tmpdir
def test_process_command_prefix(self, executor, workflow_context, tmpdir):
- use_ctx = 'ctx node runtime-properties map.key value'
+ use_ctx = 'ctx node attributes map.key value'
python_script = ['import subprocess',
'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
python_script = '\n'.join(python_script)
@@ -114,19 +114,19 @@ class TestLocalRunScript(object):
'env': {'TEST_KEY': 'value'},
'command_prefix': 'python'
})
- p_map = props['map']
+ p_map = props['map'].value
assert p_map['key'] == 'value'
def test_process_args(self, executor, workflow_context, tmpdir):
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties map.arg1 "$1"
- ctx node runtime-properties map.arg2 $2
+ ctx node attributes map.arg1 "$1"
+ ctx node attributes map.arg2 $2
''',
windows_script='''
- ctx node runtime-properties map.arg1 %1
- ctx node runtime-properties map.arg2 %2
+ ctx node attributes map.arg1 %1
+ ctx node attributes map.arg2 %2
''')
props = self._run(
executor, workflow_context,
@@ -134,8 +134,8 @@ class TestLocalRunScript(object):
process={
'args': ['"arg with spaces"', 'arg2']
})
- assert props['map']['arg1'] == 'arg with spaces'
- assert props['map']['arg2'] == 'arg2'
+ assert props['map'].value['arg1'] == 'arg with spaces'
+ assert props['map'].value['arg2'] == 'arg2'
def test_no_script_path(self, executor, workflow_context):
exception = self._run_and_get_task_exception(
@@ -187,7 +187,7 @@ class TestLocalRunScript(object):
script = '''
from aria.orchestrator.execution_plugin import ctx, inputs
if __name__ == '__main__':
- ctx.node.runtime_properties['key'] = inputs['key']
+ ctx.node.attributes['key'] = inputs['key']
'''
suffix = '.py'
script_path = self._create_script(
@@ -200,7 +200,7 @@ if __name__ == '__main__':
executor, workflow_context,
script_path=script_path,
inputs={'key': 'value'})
- assert props['key'] == 'value'
+ assert props['key'].value == 'value'
@pytest.mark.parametrize(
'value', ['string-value', [1, 2, 3], 999, 3.14, False,
@@ -209,16 +209,17 @@ if __name__ == '__main__':
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties key "${input_as_env_var}"
+ ctx node attributes key "${input_as_env_var}"
''',
windows_script='''
- ctx node runtime-properties key "%input_as_env_var%"
+ ctx node attributes key "%input_as_env_var%"
''')
props = self._run(
executor, workflow_context,
script_path=script_path,
env_var=value)
- expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+ value = props['key'].value
+ expected = value if isinstance(value, basestring) else json.loads(value)
assert expected == value
@pytest.mark.parametrize('value', ['override', {'key': 'value'}])
@@ -227,10 +228,10 @@ if __name__ == '__main__':
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties key "${input_as_env_var}"
+ ctx node attributes key "${input_as_env_var}"
''',
windows_script='''
- ctx node runtime-properties key "%input_as_env_var%"
+ ctx node attributes key "%input_as_env_var%"
''')
props = self._run(
@@ -242,17 +243,18 @@ if __name__ == '__main__':
'input_as_env_var': value
}
})
- expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+ value = props['key'].value
+ expected = value if isinstance(value, basestring) else json.loads(value)
assert expected == value
def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx node runtime-properties nonexistent
+ ctx node attributes nonexistent
''',
windows_script='''
- ctx node runtime-properties nonexistent
+ ctx node attributes nonexistent
''')
exception = self._run_and_get_task_exception(
executor, workflow_context,
@@ -266,10 +268,10 @@ if __name__ == '__main__':
script_path = self._create_script(
tmpdir,
linux_script='''#! /bin/bash -e
- ctx -j instance runtime-properties nonexistent
+ ctx -j instance attributes nonexistent
''',
windows_script='''
- ctx -j instance runtime-properties nonexistent
+ ctx -j instance attributes nonexistent
''')
exception = self._run_and_get_task_exception(
executor, workflow_context,
@@ -502,7 +504,7 @@ if __name__ == '__main__':
tasks_graph=tasks_graph)
eng.execute()
return workflow_context.model.node.get_by_name(
- mock.models.DEPENDENCY_NODE_NAME).runtime_properties
+ mock.models.DEPENDENCY_NODE_NAME).attributes
@pytest.fixture
def executor(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index a9dc5e8..92d250e 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -53,9 +53,9 @@ _FABRIC_ENV = {
class TestWithActualSSHServer(object):
def test_run_script_basic(self):
- expected_runtime_property_value = 'some_value'
- props = self._execute(env={'test_value': expected_runtime_property_value})
- assert props['test_value'] == expected_runtime_property_value
+ expected_attribute_value = 'some_value'
+ props = self._execute(env={'test_value': expected_attribute_value})
+ assert props['test_value'].value == expected_attribute_value
@pytest.mark.skip(reason='sudo privileges are required')
def test_run_script_as_sudo(self):
@@ -66,7 +66,7 @@ class TestWithActualSSHServer(object):
def test_run_script_default_base_dir(self):
props = self._execute()
- assert props['work_dir'] == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
+ assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
@pytest.mark.skip(reason='Re-enable once output from process executor can be captured')
@pytest.mark.parametrize('hide_groups', [[], ['everything']])
@@ -93,16 +93,16 @@ class TestWithActualSSHServer(object):
'cwd': expected_cwd,
'base_dir': expected_base_dir
})
- assert props['env_value'] == expected_env_value
- assert len(props['bash_version']) > 0
- assert props['arg1_value'] == expected_arg1_value
- assert props['arg2_value'] == expected_arg2_value
- assert props['cwd'] == expected_cwd
- assert props['ctx_path'] == '{0}/ctx'.format(expected_base_dir)
+ assert props['env_value'].value == expected_env_value
+ assert len(props['bash_version'].value) > 0
+ assert props['arg1_value'].value == expected_arg1_value
+ assert props['arg2_value'].value == expected_arg2_value
+ assert props['cwd'].value == expected_cwd
+ assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
def test_run_script_command_prefix(self):
props = self._execute(process={'command_prefix': 'bash -i'})
- assert 'i' in props['dollar_dash']
+ assert 'i' in props['dollar_dash'].value
def test_run_script_reuse_existing_ctx(self):
expected_test_value_1 = 'test_value_1'
@@ -112,27 +112,27 @@ class TestWithActualSSHServer(object):
'{0}_2'.format(self.test_name)],
env={'test_value1': expected_test_value_1,
'test_value2': expected_test_value_2})
- assert props['test_value1'] == expected_test_value_1
- assert props['test_value2'] == expected_test_value_2
+ assert props['test_value1'].value == expected_test_value_1
+ assert props['test_value2'].value == expected_test_value_2
def test_run_script_download_resource_plain(self, tmpdir):
resource = tmpdir.join('resource')
resource.write('content')
self._upload(str(resource), 'test_resource')
props = self._execute()
- assert props['test_value'] == 'content'
+ assert props['test_value'].value == 'content'
def test_run_script_download_resource_and_render(self, tmpdir):
resource = tmpdir.join('resource')
resource.write('{{ctx.service.name}}')
self._upload(str(resource), 'test_resource')
props = self._execute()
- assert props['test_value'] == self._workflow_context.service.name
+ assert props['test_value'].value == self._workflow_context.service.name
@pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
def test_run_script_inputs_as_env_variables_no_override(self, value):
props = self._execute(custom_input=value)
- return_value = props['test_value']
+ return_value = props['test_value'].value
expected = return_value if isinstance(value, basestring) else json.loads(return_value)
assert value == expected
@@ -140,7 +140,7 @@ class TestWithActualSSHServer(object):
def test_run_script_inputs_as_env_variables_process_env_override(self, value):
props = self._execute(custom_input='custom-input-value',
env={'custom_env_var': value})
- return_value = props['test_value']
+ return_value = props['test_value'].value
expected = return_value if isinstance(value, basestring) else json.loads(return_value)
assert value == expected
@@ -260,7 +260,7 @@ class TestWithActualSSHServer(object):
tasks_graph=tasks_graph)
eng.execute()
return self._workflow_context.model.node.get_by_name(
- mock.models.DEPENDENCY_NODE_NAME).runtime_properties
+ mock.models.DEPENDENCY_NODE_NAME).attributes
def _execute_and_get_task_exception(self, *args, **kwargs):
signal = events.on_failure_task_signal
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 50ca7f5..b1b8251 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -100,7 +100,7 @@ class TestOperationTask(object):
storage_task = ctx.model.task.get_by_name(core_task.name)
assert storage_task.plugin is storage_plugin
assert storage_task.execution_name == ctx.execution.name
- assert storage_task.actor == core_task.context.node
+ assert storage_task.actor == core_task.context.node._actor
assert core_task.model_task == storage_task
assert core_task.name == api_task.name
assert core_task.implementation == api_task.implementation
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 8ad8edb..3f89c2b 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -74,3 +74,7 @@ class MockContext(object):
return cls(storage=aria.application_model_storage(**kwargs))
else:
return cls()
+
+ @staticmethod
+ def _teardown_db_resources():
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 1dbfae1..92f0fc4 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -17,7 +17,6 @@ import time
import fasteners
import pytest
-from aria.storage.exceptions import StorageError
from aria.orchestrator import events
from aria.orchestrator.workflows.exceptions import ExecutorException
from aria.orchestrator.workflows import api
@@ -29,47 +28,37 @@ from tests.orchestrator.context import execute as execute_workflow
from tests.orchestrator.workflows.helpers import events_collector
from tests import mock
from tests import storage
+from tests import helpers
-def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
- _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
-
-
-@operation
-def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
- _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+@pytest.fixture
+def dataholder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = helpers.FilesystemDataHolder(dataholder_path)
+ return holder
-def test_concurrent_modification_on_task_failed(context, executor, lock_files):
- _test(context, executor, lock_files, _test_task_failed, expected_failure=True)
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
@operation
-def _test_task_failed(ctx, lock_files, key, first_value, second_value):
- first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
- if not first:
- raise RuntimeError('MESSAGE')
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+ _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
-def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files):
- _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False)
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+ _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
@operation
-def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
- node = ctx.node
- first = _concurrent_update(lock_files, node, key, first_value, second_value)
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+ first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
if not first:
- try:
- ctx.model.node.update(node)
- except StorageError as e:
- assert 'Version conflict' in str(e)
- ctx.model.node.refresh(node)
- else:
- raise RuntimeError('Unexpected')
+ raise RuntimeError('MESSAGE')
-def _test(context, executor, lock_files, func, expected_failure):
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
def _node(ctx):
return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
@@ -82,7 +71,8 @@ def _test(context, executor, lock_files, func, expected_failure):
'lock_files': lock_files,
'key': key,
'first_value': first_value,
- 'second_value': second_value
+ 'second_value': second_value,
+ 'holder_path': dataholder.path
}
node = _node(context)
@@ -118,17 +108,13 @@ def _test(context, executor, lock_files, func, expected_failure):
except ExecutorException:
pass
- props = _node(context).runtime_properties
- assert props[key] == first_value
+ props = _node(context).attributes
+ assert dataholder['invocations'] == 2
+ assert props[key].value == dataholder[key]
exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
if expected_failure:
assert exceptions
- exception = exceptions[-1]
- assert isinstance(exception, StorageError)
- assert 'Version conflict' in str(exception)
- else:
- assert not exceptions
@pytest.fixture
@@ -150,8 +136,8 @@ def lock_files(tmpdir):
return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
-def _concurrent_update(lock_files, node, key, first_value, second_value):
-
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+ holder = helpers.FilesystemDataHolder(holder_path)
locker1 = fasteners.InterProcessLock(lock_files[0])
locker2 = fasteners.InterProcessLock(lock_files[1])
@@ -161,11 +147,14 @@ def _concurrent_update(lock_files, node, key, first_value, second_value):
# Give chance for both processes to acquire locks
while locker2.acquire(blocking=False):
locker2.release()
- time.sleep(0.01)
+ time.sleep(0.1)
else:
locker2.acquire()
- node.runtime_properties[key] = first_value if first else second_value
+ node.attributes[key] = first_value if first else second_value
+ holder['key'] = first_value if first else second_value
+ holder.setdefault('invocations', 0)
+ holder['invocations'] += 1
if first:
locker1.release()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 878ac24..30b23ed 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -56,7 +56,7 @@ def test_decorate_extension(context, executor):
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
eng.execute()
- out = get_node(context).runtime_properties['out']
+ out = get_node(context).attributes.get('out').value
assert out['wrapper_inputs'] == inputs
assert out['function_inputs'] == inputs
@@ -67,7 +67,7 @@ class MockProcessExecutorExtension(object):
def decorate(self):
def decorator(function):
def wrapper(ctx, **operation_inputs):
- ctx.node.runtime_properties['out'] = {'wrapper_inputs': operation_inputs}
+ ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs}
function(ctx=ctx, **operation_inputs)
return wrapper
return decorator
@@ -75,7 +75,7 @@ class MockProcessExecutorExtension(object):
@operation
def _mock_operation(ctx, **operation_inputs):
- ctx.node.runtime_properties['out']['function_inputs'] = operation_inputs
+ ctx.node.attributes['out']['function_inputs'] = operation_inputs
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 4fbe9c1..2b628a0 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -28,7 +28,7 @@ from tests import mock
from tests import storage
-_TEST_RUNTIME_PROPERTIES = {
+_TEST_ATTRIBUTES = {
'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
}
@@ -46,17 +46,18 @@ def test_track_changes_of_failed_operation(context, executor):
def _assert_tracked_changes_are_applied(context):
instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
- assert instance.runtime_properties == _TEST_RUNTIME_PROPERTIES
+ assert all(instance.attributes[key].value == value
+ for key, value in _TEST_ATTRIBUTES.items())
-def _update_runtime_properties(context):
- context.node.runtime_properties.clear()
- context.node.runtime_properties.update(_TEST_RUNTIME_PROPERTIES)
+def _update_attributes(context):
+ context.node.attributes.clear()
+ context.node.attributes.update(_TEST_ATTRIBUTES)
def test_refresh_state_of_tracked_attributes(context, executor):
out = _run_workflow(context=context, executor=executor, op_func=_mock_refreshing_operation)
- assert out['initial'] == out['after_refresh']
+ assert out['after_refresh'] == out['after_change']
assert out['initial'] != out['after_change']
@@ -66,22 +67,19 @@ def test_apply_tracked_changes_during_an_operation(context, executor):
'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
}
- expected_initial = context.model.node.get_by_name(
- mock.models.DEPENDENCY_NODE_NAME).runtime_properties
-
- out = _run_workflow(context=context, executor=executor, op_func=_mock_updating_operation,
- inputs=inputs)
+ expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes
+ out = _run_workflow(
+ context=context, executor=executor, op_func=_mock_updating_operation, inputs=inputs)
expected_after_update = expected_initial.copy()
expected_after_update.update(inputs['committed']) # pylint: disable=no-member
expected_after_change = expected_after_update.copy()
expected_after_change.update(inputs['changed_but_refreshed']) # pylint: disable=no-member
- expected_after_refresh = expected_after_update
assert out['initial'] == expected_initial
assert out['after_update'] == expected_after_update
assert out['after_change'] == expected_after_change
- assert out['after_refresh'] == expected_after_refresh
+ assert out['after_refresh'] == expected_after_change
def _run_workflow(context, executor, op_func, inputs=None):
@@ -109,42 +107,42 @@ def _run_workflow(context, executor, op_func, inputs=None):
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
eng.execute()
- return context.model.node.get_by_name(
- mock.models.DEPENDENCY_NODE_NAME).runtime_properties.get('out')
+ out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
+ return out.value if out else None
@operation
def _mock_success_operation(ctx):
- _update_runtime_properties(ctx)
+ _update_attributes(ctx)
@operation
def _mock_fail_operation(ctx):
- _update_runtime_properties(ctx)
+ _update_attributes(ctx)
raise RuntimeError
@operation
def _mock_refreshing_operation(ctx):
- out = {'initial': copy.deepcopy(ctx.node.runtime_properties)}
- ctx.node.runtime_properties.update({'some': 'new', 'properties': 'right here'})
- out['after_change'] = copy.deepcopy(ctx.node.runtime_properties)
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update({'some': 'new', 'properties': 'right here'})
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
ctx.model.node.refresh(ctx.node)
- out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties)
- ctx.node.runtime_properties['out'] = out
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
@operation
def _mock_updating_operation(ctx, committed, changed_but_refreshed):
- out = {'initial': copy.deepcopy(ctx.node.runtime_properties)}
- ctx.node.runtime_properties.update(committed)
+ out = {'initial': copy.deepcopy(ctx.node.attributes)}
+ ctx.node.attributes.update(committed)
ctx.model.node.update(ctx.node)
- out['after_update'] = copy.deepcopy(ctx.node.runtime_properties)
- ctx.node.runtime_properties.update(changed_but_refreshed)
- out['after_change'] = copy.deepcopy(ctx.node.runtime_properties)
+ out['after_update'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes.update(changed_but_refreshed)
+ out['after_change'] = copy.deepcopy(ctx.node.attributes)
ctx.model.node.refresh(ctx.node)
- out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties)
- ctx.node.runtime_properties['out'] = out
+ out['after_refresh'] = copy.deepcopy(ctx.node.attributes)
+ ctx.node.attributes['out'] = out
def _operation_mapping(func):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ebd1ef85/tests/resources/scripts/test_ssh.sh
----------------------------------------------------------------------
diff --git a/tests/resources/scripts/test_ssh.sh b/tests/resources/scripts/test_ssh.sh
index 90202c7..bbdf773 100644
--- a/tests/resources/scripts/test_ssh.sh
+++ b/tests/resources/scripts/test_ssh.sh
@@ -4,7 +4,7 @@ set -u
set -e
test_run_script_basic() {
- ctx node runtime-properties test_value $test_value
+ ctx node attributes test_value $test_value
}
test_run_script_as_sudo() {
@@ -12,7 +12,7 @@ test_run_script_as_sudo() {
}
test_run_script_default_base_dir() {
- ctx node runtime-properties work_dir $PWD
+ ctx node attributes work_dir $PWD
}
test_run_script_with_hide() {
@@ -20,44 +20,44 @@ test_run_script_with_hide() {
}
test_run_script_process_config() {
- ctx node runtime-properties env_value $test_value_env
- ctx node runtime-properties bash_version $BASH_VERSION
- ctx node runtime-properties arg1_value $1
- ctx node runtime-properties arg2_value $2
- ctx node runtime-properties cwd $PWD
- ctx node runtime-properties ctx_path $(which ctx)
+ ctx node attributes env_value $test_value_env
+ ctx node attributes bash_version $BASH_VERSION
+ ctx node attributes arg1_value $1
+ ctx node attributes arg2_value $2
+ ctx node attributes cwd $PWD
+ ctx node attributes ctx_path $(which ctx)
}
test_run_script_command_prefix() {
- ctx node runtime-properties dollar_dash $-
+ ctx node attributes dollar_dash $-
}
test_run_script_reuse_existing_ctx_1() {
- ctx node runtime-properties test_value1 $test_value1
+ ctx node attributes test_value1 $test_value1
}
test_run_script_reuse_existing_ctx_2() {
- ctx node runtime-properties test_value2 $test_value2
+ ctx node attributes test_value2 $test_value2
}
test_run_script_download_resource_plain() {
local destination=$(mktemp)
ctx download-resource ${destination} test_resource
- ctx node runtime-properties test_value "$(cat ${destination})"
+ ctx node attributes test_value "$(cat ${destination})"
}
test_run_script_download_resource_and_render() {
local destination=$(mktemp)
ctx download-resource-and-render ${destination} test_resource
- ctx node runtime-properties test_value "$(cat ${destination})"
+ ctx node attributes test_value "$(cat ${destination})"
}
test_run_script_inputs_as_env_variables_no_override() {
- ctx node runtime-properties test_value "$custom_env_var"
+ ctx node attributes test_value "$custom_env_var"
}
test_run_script_inputs_as_env_variables_process_env_override() {
- ctx node runtime-properties test_value "$custom_env_var"
+ ctx node attributes test_value "$custom_env_var"
}
test_run_script_error_in_script() {