You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/11 11:44:47 UTC
[06/12] incubator-ariatosca git commit: added another sqla event
handler - needs rework, since we listen to init and not commit
added another sqla event handler - needs rework, since we listen to init and not commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/41cd1bee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/41cd1bee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/41cd1bee
Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 41cd1bee4e09d1872113a1ddae2d4cc17387c5e1
Parents: c8896ab
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 10 18:38:36 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 11 14:28:49 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/context/common.py | 3 +-
aria/orchestrator/workflows/executor/process.py | 43 +++++++---
aria/storage/instrumentation.py | 83 ++++++++++++--------
aria/storage/sql_mapi.py | 2 +-
examples/hello-world/scripts/configure.sh | 4 +-
examples/hello-world/scripts/start.sh | 6 +-
examples/hello-world/scripts/stop.sh | 4 +-
7 files changed, 89 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index c0047e9..0854a27 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -192,7 +192,6 @@ class BaseContext(object):
def _render_resource(self, resource_content, variables):
variables = variables or {}
- if 'ctx' not in variables:
- variables['ctx'] = self
+ variables.setdefault('ctx', self)
resource_template = jinja2.Template(resource_content)
return resource_template.render(variables)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index e464f7d..52da26d 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -229,6 +229,7 @@ class ProcessExecutor(base.BaseExecutor):
def _apply_tracked_changes(task, request):
instrumentation.apply_tracked_changes(
tracked_changes=request['tracked_changes'],
+ new_instances=request['new_instances'],
model=task.context.model)
@@ -277,22 +278,28 @@ class _Messenger(object):
"""Task started message"""
self._send_message(type='started')
- def succeeded(self, tracked_changes):
+ def succeeded(self, tracked_changes, new_instances):
"""Task succeeded message"""
- self._send_message(type='succeeded', tracked_changes=tracked_changes)
+ self._send_message(
+ type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
- def failed(self, tracked_changes, exception):
+ def failed(self, tracked_changes, new_instances, exception):
"""Task failed message"""
- self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
+ self._send_message(type='failed',
+ tracked_changes=tracked_changes,
+ new_instances=new_instances,
+ exception=exception)
- def apply_tracked_changes(self, tracked_changes):
- self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
+ def apply_tracked_changes(self, tracked_changes, new_instances):
+ self._send_message(type='apply_tracked_changes',
+ tracked_changes=tracked_changes,
+ new_instances=new_instances)
def closed(self):
"""Executor closed message"""
self._send_message(type='closed')
- def _send_message(self, type, tracked_changes=None, exception=None):
+ def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.port))
try:
@@ -301,7 +308,8 @@ class _Messenger(object):
'task_id': self.task_id,
'exception': exceptions.wrap_if_needed(exception),
'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
- 'tracked_changes': tracked_changes
+ 'tracked_changes': tracked_changes,
+ 'new_instances': new_instances or {}
})
response = _recv_message(sock)
response_exception = response.get('exception')
@@ -311,7 +319,7 @@ class _Messenger(object):
sock.close()
-def _patch_session(ctx, messenger, instrument):
+def _patch_ctx(ctx, messenger, instrument):
# model will be None only in tests that test the executor component directly
if not ctx.model:
return
@@ -326,17 +334,23 @@ def _patch_session(ctx, messenger, instrument):
original_refresh(target)
def patched_commit():
- messenger.apply_tracked_changes(instrument.tracked_changes)
+ messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
instrument.clear()
def patched_rollback():
# Rollback is performed on parent process when commit fails
pass
+ def patched_put(_):
+ # TODO: currently we need to add signal to the put event (or commit per model), but currently we just use the init event.
+ pass
+
# when autoflush is set to true (the default), refreshing an object will trigger
# an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
# far on the session. this is not the desired behavior in the subprocess
session.autoflush = False
+ # for model_name in instrument.new_instances:
+ ctx.model.log.put = patched_put
session.commit = patched_commit
session.rollback = patched_rollback
@@ -368,15 +382,18 @@ def _main():
try:
messenger.started()
ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
- _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
+ _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
task_func = imports.load_attribute(implementation)
aria.install_aria_extensions()
for decorate in process_executor.decorate():
task_func = decorate(task_func)
task_func(ctx=ctx, **operation_inputs)
- messenger.succeeded(tracked_changes=instrument.tracked_changes)
+ messenger.succeeded(tracked_changes=instrument.tracked_changes,
+ new_instances=instrument.new_instances)
except BaseException as e:
- messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)
+ messenger.failed(exception=e,
+ tracked_changes=instrument.tracked_changes,
+ new_instances=instrument.new_instances)
if __name__ == '__main__':
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 79f821a..6ca79ce 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -24,22 +24,17 @@ from ..storage.exceptions import StorageError
_VERSION_ID_COL = 'version'
_STUB = object()
_INSTRUMENTED = {
- # Node related stuff
- _models.Node.runtime_properties: dict,
- _models.Node.state: str,
-
- # # Task related stuff
- _models.Task.status: str,
-
- # # Log related stuff
- _models.Log.level: str,
- _models.Log.msg: str,
- _models.Log.traceback: str,
- _models.Log.created_at: lambda date: date,
- _models.Log.execution_fk: int,
- _models.Log.task_fk: int,
+ 'modified': {
+ _models.Node.runtime_properties: dict,
+ _models.Node.state: str,
+ # _models.Task.status: str,
+ },
+ 'new': (_models.Log, )
+
}
+_NEW_INSTANCE = 'NEW_INSTANCE'
+
def track_changes(instrumented=None):
"""Track changes in the specified model columns
@@ -71,23 +66,45 @@ class _Instrumentation(object):
def __init__(self, instrumented):
self.tracked_changes = {}
+ self.new_instances = {}
self.listeners = []
self._track_changes(instrumented)
+ self._new_instance_index = 0
+
+ @property
+ def _new_instance_id(self):
+ self._new_instance_index += 1
+ return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1)
def _track_changes(self, instrumented):
- instrumented_classes = {}
- for instrumented_attribute, attribute_type in instrumented.items():
+ instrumented_attribute_classes = {}
+ for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
self._register_set_attribute_listener(
instrumented_attribute=instrumented_attribute,
attribute_type=attribute_type)
instrumented_class = instrumented_attribute.parent.entity
- instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {})
+ instrumented_class_attributes = instrumented_attribute_classes.setdefault(
+ instrumented_class, {})
instrumented_class_attributes[instrumented_attribute.key] = attribute_type
- for instrumented_class, instrumented_attributes in instrumented_classes.items():
- self._register_instance_listeners(
+ for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
+ self._register_instance_attribute_listeners(
instrumented_class=instrumented_class,
instrumented_attributes=instrumented_attributes)
+ # instrument creation of new instances
+ for instrumented_class in instrumented.get('new', {}):
+ self._register_instance_listener(instrumented_class)
+
+ def _register_instance_listener(self, instrumented_class):
+ def listener(target, *args, **_):
+ mapi_name = target.__modelname__
+ tracked_instances = self.new_instances.setdefault(mapi_name, {})
+ tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
+ tracked_attributes.update(**args[1])
+ listener_args = (instrumented_class, 'init', listener)
+ sqlalchemy.event.listen(*listener_args)
+ self.listeners.append(listener_args)
+
def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
def listener(target, value, *_):
mapi_name = target.__modelname__
@@ -103,7 +120,7 @@ class _Instrumentation(object):
sqlalchemy.event.listen(*listener_args, retval=True)
self.listeners.append(listener_args)
- def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
+ def _register_instance_attribute_listeners(self, instrumented_class, instrumented_attributes):
def listener(target, *_):
mapi_name = instrumented_class.__modelname__
tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
@@ -136,6 +153,8 @@ class _Instrumentation(object):
else:
self.tracked_changes.clear()
+ self.new_instances.clear()
+
def restore(self):
"""Remove all listeners registered by this instrumentation"""
for listener_args in self.listeners:
@@ -171,7 +190,7 @@ class _Value(object):
return {'initial': self.initial, 'current': self.current}.copy()
-def apply_tracked_changes(tracked_changes, model):
+def apply_tracked_changes(tracked_changes, new_instances, model):
"""Write tracked changes back to the database using provided model storage
:param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
@@ -188,23 +207,21 @@ def apply_tracked_changes(tracked_changes, model):
instance = None
for attribute_name, value in tracked_attributes.items():
if value.initial != value.current:
- if not instance:
- # The object can be entirely new (Log is an example of this use case,
- # its id is None (or 'null'), thus we need to create it from scratch,
- # and not just update it.
- instance = \
- mapi.model_cls() if instance_id == 'null' else mapi.get(instance_id)
-
+ instance = instance or mapi.get(instance_id)
setattr(instance, attribute_name, value.current)
if instance:
_validate_version_id(instance, mapi)
- # This follows the same logic as the same comment regarding 'null'
- if instance_id in ('null', None):
- mapi.put(instance)
- else:
- mapi.update(instance)
+ mapi.update(instance)
successfully_updated_changes[mapi_name][instance_id] = [
v.dict for v in tracked_attributes.values()]
+
+ for mapi_name, new_instance in new_instances.items():
+ successfully_updated_changes[mapi_name] = dict()
+ mapi = getattr(model, mapi_name)
+ for new_instance_kwargs in sorted(new_instance.values()):
+ instance = mapi.model_cls(**new_instance_kwargs)
+ mapi.put(instance)
+ successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
except BaseException:
for key, value in successfully_updated_changes.items():
if not value:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 730d007..a22c4b1 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -397,7 +397,7 @@ def init_storage(base_dir, filename='db.sqlite'):
path=os.path.join(base_dir, filename))
- engine = create_engine(uri)
+ engine = create_engine(uri, )
session_factory = orm.sessionmaker(bind=engine)
session = orm.scoped_session(session_factory=session_factory)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/configure.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/configure.sh b/examples/hello-world/scripts/configure.sh
index 6e93053..b55ec17 100755
--- a/examples/hello-world/scripts/configure.sh
+++ b/examples/hello-world/scripts/configure.sh
@@ -8,7 +8,7 @@ if [ -d ${PYTHON_FILE_SERVER_ROOT} ]; then
echo "Removing file server root folder ${PYTHON_FILE_SERVER_ROOT}"
rm -rf ${PYTHON_FILE_SERVER_ROOT}
fi
-#ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}"
+ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}"
mkdir -p ${PYTHON_FILE_SERVER_ROOT}
@@ -17,7 +17,7 @@ cd ${PYTHON_FILE_SERVER_ROOT}
index_path="index.html"
image_path="images/aria-logo.png"
-#ctx logger info "Downloading blueprint resources..."
+ctx logger info "Downloading blueprint resources..."
ctx download-resource-and-render ${PYTHON_FILE_SERVER_ROOT}/index.html ${index_path}
ctx download-resource ${PYTHON_FILE_SERVER_ROOT}/aria-logo.png ${image_path}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/start.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/start.sh b/examples/hello-world/scripts/start.sh
index 03c59d4..41b6ec7 100755
--- a/examples/hello-world/scripts/start.sh
+++ b/examples/hello-world/scripts/start.sh
@@ -6,16 +6,16 @@ TEMP_DIR="/tmp"
PYTHON_FILE_SERVER_ROOT=${TEMP_DIR}/python-simple-http-webserver
PID_FILE="server.pid"
-#ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}"
+ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}"
port=$(ctx node properties port)
cd ${PYTHON_FILE_SERVER_ROOT}
-#ctx logger info "Starting SimpleHTTPServer"
+ctx logger info "Starting SimpleHTTPServer"
nohup python -m SimpleHTTPServer ${port} > /dev/null 2>&1 &
echo $! > ${PID_FILE}
-#ctx logger info "Waiting for server to launch on port ${port}"
+ctx logger info "Waiting for server to launch on port ${port}"
url="http://localhost:${port}"
server_is_up() {
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/stop.sh b/examples/hello-world/scripts/stop.sh
index 63e1971..5461caf 100755
--- a/examples/hello-world/scripts/stop.sh
+++ b/examples/hello-world/scripts/stop.sh
@@ -8,8 +8,8 @@ PID_FILE="server.pid"
PID=`cat ${PYTHON_FILE_SERVER_ROOT}/${PID_FILE}`
-#ctx logger info "Shutting down file server. pid = ${PID}"
+ctx logger info "Shutting down file server. pid = ${PID}"
kill -9 ${PID} || exit $?
-#ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})"
+ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})"
rm -rf ${PYTHON_FILE_SERVER_ROOT}