You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/11 11:44:47 UTC

[06/12] incubator-ariatosca git commit: added another sqla event handler - needs rework, since we listen to init and not commit

added another sqla event handler - needs rework, since we listen to init and not commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/41cd1bee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/41cd1bee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/41cd1bee

Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 41cd1bee4e09d1872113a1ddae2d4cc17387c5e1
Parents: c8896ab
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 10 18:38:36 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu May 11 14:28:49 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/common.py             |  3 +-
 aria/orchestrator/workflows/executor/process.py | 43 +++++++---
 aria/storage/instrumentation.py                 | 83 ++++++++++++--------
 aria/storage/sql_mapi.py                        |  2 +-
 examples/hello-world/scripts/configure.sh       |  4 +-
 examples/hello-world/scripts/start.sh           |  6 +-
 examples/hello-world/scripts/stop.sh            |  4 +-
 7 files changed, 89 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index c0047e9..0854a27 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -192,7 +192,6 @@ class BaseContext(object):
 
     def _render_resource(self, resource_content, variables):
         variables = variables or {}
-        if 'ctx' not in variables:
-            variables['ctx'] = self
+        variables.setdefault('ctx', self)
         resource_template = jinja2.Template(resource_content)
         return resource_template.render(variables)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index e464f7d..52da26d 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -229,6 +229,7 @@ class ProcessExecutor(base.BaseExecutor):
     def _apply_tracked_changes(task, request):
         instrumentation.apply_tracked_changes(
             tracked_changes=request['tracked_changes'],
+            new_instances=request['new_instances'],
             model=task.context.model)
 
 
@@ -277,22 +278,28 @@ class _Messenger(object):
         """Task started message"""
         self._send_message(type='started')
 
-    def succeeded(self, tracked_changes):
+    def succeeded(self, tracked_changes, new_instances):
         """Task succeeded message"""
-        self._send_message(type='succeeded', tracked_changes=tracked_changes)
+        self._send_message(
+            type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
 
-    def failed(self, tracked_changes, exception):
+    def failed(self, tracked_changes, new_instances, exception):
         """Task failed message"""
-        self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
+        self._send_message(type='failed',
+                           tracked_changes=tracked_changes,
+                           new_instances=new_instances,
+                           exception=exception)
 
-    def apply_tracked_changes(self, tracked_changes):
-        self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
+    def apply_tracked_changes(self, tracked_changes, new_instances):
+        self._send_message(type='apply_tracked_changes',
+                           tracked_changes=tracked_changes,
+                           new_instances=new_instances)
 
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
 
-    def _send_message(self, type, tracked_changes=None, exception=None):
+    def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
@@ -301,7 +308,8 @@ class _Messenger(object):
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
                 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
-                'tracked_changes': tracked_changes
+                'tracked_changes': tracked_changes,
+                'new_instances': new_instances or {}
             })
             response = _recv_message(sock)
             response_exception = response.get('exception')
@@ -311,7 +319,7 @@ class _Messenger(object):
             sock.close()
 
 
-def _patch_session(ctx, messenger, instrument):
+def _patch_ctx(ctx, messenger, instrument):
     # model will be None only in tests that test the executor component directly
     if not ctx.model:
         return
@@ -326,17 +334,23 @@ def _patch_session(ctx, messenger, instrument):
         original_refresh(target)
 
     def patched_commit():
-        messenger.apply_tracked_changes(instrument.tracked_changes)
+        messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
         instrument.clear()
 
     def patched_rollback():
         # Rollback is performed on parent process when commit fails
         pass
 
+    def patched_put(_):
+        # TODO: currently we need to add signal to the put event (or commit per model), but currently we just use the init event.
+        pass
+
     # when autoflush is set to true (the default), refreshing an object will trigger
     # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
     # far on the session. this is not the desired behavior in the subprocess
     session.autoflush = False
+    # for model_name in instrument.new_instances:
+    ctx.model.log.put = patched_put
 
     session.commit = patched_commit
     session.rollback = patched_rollback
@@ -368,15 +382,18 @@ def _main():
         try:
             messenger.started()
             ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
-            _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
+            _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
             task_func = imports.load_attribute(implementation)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():
                 task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
-            messenger.succeeded(tracked_changes=instrument.tracked_changes)
+            messenger.succeeded(tracked_changes=instrument.tracked_changes,
+                                new_instances=instrument.new_instances)
         except BaseException as e:
-            messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)
+            messenger.failed(exception=e,
+                             tracked_changes=instrument.tracked_changes,
+                             new_instances=instrument.new_instances)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 79f821a..6ca79ce 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -24,22 +24,17 @@ from ..storage.exceptions import StorageError
 _VERSION_ID_COL = 'version'
 _STUB = object()
 _INSTRUMENTED = {
-    # Node related stuff
-    _models.Node.runtime_properties: dict,
-    _models.Node.state: str,
-
-    # # Task related stuff
-    _models.Task.status: str,
-
-    # # Log related stuff
-    _models.Log.level: str,
-    _models.Log.msg: str,
-    _models.Log.traceback: str,
-    _models.Log.created_at: lambda date: date,
-    _models.Log.execution_fk: int,
-    _models.Log.task_fk: int,
+    'modified': {
+        _models.Node.runtime_properties: dict,
+        _models.Node.state: str,
+        # _models.Task.status: str,
+    },
+    'new': (_models.Log, )
+
 }
 
+_NEW_INSTANCE = 'NEW_INSTANCE'
+
 
 def track_changes(instrumented=None):
     """Track changes in the specified model columns
@@ -71,23 +66,45 @@ class _Instrumentation(object):
 
     def __init__(self, instrumented):
         self.tracked_changes = {}
+        self.new_instances = {}
         self.listeners = []
         self._track_changes(instrumented)
+        self._new_instance_index = 0
+
+    @property
+    def _new_instance_id(self):
+        self._new_instance_index += 1
+        return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1)
 
     def _track_changes(self, instrumented):
-        instrumented_classes = {}
-        for instrumented_attribute, attribute_type in instrumented.items():
+        instrumented_attribute_classes = {}
+        for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
             self._register_set_attribute_listener(
                 instrumented_attribute=instrumented_attribute,
                 attribute_type=attribute_type)
             instrumented_class = instrumented_attribute.parent.entity
-            instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class, {})
+            instrumented_class_attributes = instrumented_attribute_classes.setdefault(
+                instrumented_class, {})
             instrumented_class_attributes[instrumented_attribute.key] = attribute_type
-        for instrumented_class, instrumented_attributes in instrumented_classes.items():
-            self._register_instance_listeners(
+        for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
+            self._register_instance_attribute_listeners(
                 instrumented_class=instrumented_class,
                 instrumented_attributes=instrumented_attributes)
 
+        # instrument creation of new instances
+        for instrumented_class in instrumented.get('new', {}):
+            self._register_instance_listener(instrumented_class)
+
+    def _register_instance_listener(self, instrumented_class):
+        def listener(target, *args, **_):
+            mapi_name = target.__modelname__
+            tracked_instances = self.new_instances.setdefault(mapi_name, {})
+            tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
+            tracked_attributes.update(**args[1])
+        listener_args = (instrumented_class, 'init', listener)
+        sqlalchemy.event.listen(*listener_args)
+        self.listeners.append(listener_args)
+
     def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
         def listener(target, value, *_):
             mapi_name = target.__modelname__
@@ -103,7 +120,7 @@ class _Instrumentation(object):
         sqlalchemy.event.listen(*listener_args, retval=True)
         self.listeners.append(listener_args)
 
-    def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
+    def _register_instance_attribute_listeners(self, instrumented_class, instrumented_attributes):
         def listener(target, *_):
             mapi_name = instrumented_class.__modelname__
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
@@ -136,6 +153,8 @@ class _Instrumentation(object):
         else:
             self.tracked_changes.clear()
 
+        self.new_instances.clear()
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -171,7 +190,7 @@ class _Value(object):
         return {'initial': self.initial, 'current': self.current}.copy()
 
 
-def apply_tracked_changes(tracked_changes, model):
+def apply_tracked_changes(tracked_changes, new_instances, model):
     """Write tracked changes back to the database using provided model storage
 
     :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
@@ -188,23 +207,21 @@ def apply_tracked_changes(tracked_changes, model):
                 instance = None
                 for attribute_name, value in tracked_attributes.items():
                     if value.initial != value.current:
-                        if not instance:
-                            # The object can be entirely new (Log is an example of this use case,
-                            # its id is None (or 'null'), thus we need to create it from scratch,
-                            # and not just update it.
-                            instance = \
-                                mapi.model_cls() if instance_id == 'null' else mapi.get(instance_id)
-
+                        instance = instance or mapi.get(instance_id)
                         setattr(instance, attribute_name, value.current)
                 if instance:
                     _validate_version_id(instance, mapi)
-                    # This follows the same logic as the same comment regarding 'null'
-                    if instance_id in ('null', None):
-                        mapi.put(instance)
-                    else:
-                        mapi.update(instance)
+                    mapi.update(instance)
                     successfully_updated_changes[mapi_name][instance_id] = [
                         v.dict for v in tracked_attributes.values()]
+
+        for mapi_name, new_instance in new_instances.items():
+            successfully_updated_changes[mapi_name] = dict()
+            mapi = getattr(model, mapi_name)
+            for new_instance_kwargs in sorted(new_instance.values()):
+                instance = mapi.model_cls(**new_instance_kwargs)
+                mapi.put(instance)
+                successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
     except BaseException:
         for key, value in successfully_updated_changes.items():
             if not value:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 730d007..a22c4b1 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -397,7 +397,7 @@ def init_storage(base_dir, filename='db.sqlite'):
 
         path=os.path.join(base_dir, filename))
 
-    engine = create_engine(uri)
+    engine = create_engine(uri, )
     session_factory = orm.sessionmaker(bind=engine)
     session = orm.scoped_session(session_factory=session_factory)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/configure.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/configure.sh b/examples/hello-world/scripts/configure.sh
index 6e93053..b55ec17 100755
--- a/examples/hello-world/scripts/configure.sh
+++ b/examples/hello-world/scripts/configure.sh
@@ -8,7 +8,7 @@ if [ -d ${PYTHON_FILE_SERVER_ROOT} ]; then
 	echo "Removing file server root folder ${PYTHON_FILE_SERVER_ROOT}"
 	rm -rf ${PYTHON_FILE_SERVER_ROOT}
 fi
-#ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}"
+ctx logger info "Creating HTTP server root directory at ${PYTHON_FILE_SERVER_ROOT}"
 
 mkdir -p ${PYTHON_FILE_SERVER_ROOT}
 
@@ -17,7 +17,7 @@ cd ${PYTHON_FILE_SERVER_ROOT}
 index_path="index.html"
 image_path="images/aria-logo.png"
 
-#ctx logger info "Downloading blueprint resources..."
+ctx logger info "Downloading blueprint resources..."
 ctx download-resource-and-render ${PYTHON_FILE_SERVER_ROOT}/index.html ${index_path}
 ctx download-resource ${PYTHON_FILE_SERVER_ROOT}/aria-logo.png ${image_path}
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/start.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/start.sh b/examples/hello-world/scripts/start.sh
index 03c59d4..41b6ec7 100755
--- a/examples/hello-world/scripts/start.sh
+++ b/examples/hello-world/scripts/start.sh
@@ -6,16 +6,16 @@ TEMP_DIR="/tmp"
 PYTHON_FILE_SERVER_ROOT=${TEMP_DIR}/python-simple-http-webserver
 PID_FILE="server.pid"
 
-#ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}"
+ctx logger info "Starting HTTP server from ${PYTHON_FILE_SERVER_ROOT}"
 
 port=$(ctx node properties port)
 
 cd ${PYTHON_FILE_SERVER_ROOT}
-#ctx logger info "Starting SimpleHTTPServer"
+ctx logger info "Starting SimpleHTTPServer"
 nohup python -m SimpleHTTPServer ${port} > /dev/null 2>&1 &
 echo $! > ${PID_FILE}
 
-#ctx logger info "Waiting for server to launch on port ${port}"
+ctx logger info "Waiting for server to launch on port ${port}"
 url="http://localhost:${port}"
 
 server_is_up() {

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/41cd1bee/examples/hello-world/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/examples/hello-world/scripts/stop.sh b/examples/hello-world/scripts/stop.sh
index 63e1971..5461caf 100755
--- a/examples/hello-world/scripts/stop.sh
+++ b/examples/hello-world/scripts/stop.sh
@@ -8,8 +8,8 @@ PID_FILE="server.pid"
 
 PID=`cat ${PYTHON_FILE_SERVER_ROOT}/${PID_FILE}`
 
-#ctx logger info "Shutting down file server. pid = ${PID}"
+ctx logger info "Shutting down file server. pid = ${PID}"
 kill -9 ${PID} || exit $?
 
-#ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})"
+ctx logger info "Deleting file server root directory (${PYTHON_FILE_SERVER_ROOT})"
 rm -rf ${PYTHON_FILE_SERVER_ROOT}