You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/23 22:13:08 UTC

incubator-ariatosca git commit: removed endpoint for session terminaton. clearing a session is done inhouse

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution c1868c2b4 -> 0f2df7d1b


removed endpoint for session terminaton. clearing a session is done inhouse


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0f2df7d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0f2df7d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0f2df7d1

Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution
Commit: 0f2df7d1b01fc515f578cc02ad9594330a5bbccf
Parents: c1868c2
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 24 01:13:04 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed May 24 01:13:04 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/operation.py          |  5 +++
 .../execution_plugin/ctx_proxy/client.py        | 34 +++++----------
 .../execution_plugin/ctx_proxy/server.py        | 46 +++++++++++---------
 aria/orchestrator/workflows/executor/process.py |  3 ++
 4 files changed, 46 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 52d3ab6..0ce790f 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -102,6 +102,11 @@ class BaseOperationContext(BaseContext):
                    destroy_session=True,
                    **kwargs)
 
+    def close(self):
+        if self._destroy_session:
+            self.model.log._session.remove()
+            self.model.log._engine.dispose()
+
 
 class NodeOperationContext(BaseOperationContext):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/client.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
index fbfff57..f7f56aa 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
@@ -89,29 +89,19 @@ def _process_args(json_prefix, args):
     return processed_args
 
 
-def _close_session(socket_url, timeout):
-    return _client_request(socket_url, [], timeout, 'DELETE')
-
-
 def main(args=None):
-    try:
-        args = _parse_args(args)
-        response = _client_request(
-            args.socket_url,
-            args=_process_args(args.json_arg_prefix, args.args),
-            timeout=args.timeout)
-        if args.json_output:
-            response = json.dumps(response)
-        else:
-            if not response:
-                response = ''
-            response = str(response)
-        sys.stdout.write(response)
-    except BaseException as origial_e:
-        try:
-            _close_session(args.socket_url, args.timeout)
-        except BaseException:
-            raise origial_e
+    args = _parse_args(args)
+    response = _client_request(
+        args.socket_url,
+        args=_process_args(args.json_arg_prefix, args.args),
+        timeout=args.timeout)
+    if args.json_output:
+        response = json.dumps(response)
+    else:
+        if not response:
+            response = ''
+        response = str(response)
+    sys.stdout.write(response)
 
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 6e5b0e3..d9195ff 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -27,7 +27,6 @@ import bottle
 from aria import modeling
 
 from .. import exceptions
-from . import client
 
 
 class CtxProxy(object):
@@ -38,21 +37,39 @@ class CtxProxy(object):
         self.port = _get_unused_port()
         self.socket_url = 'http://localhost:{0}'.format(self.port)
         self.server = None
+        self.bottle_server = None
         self._started = Queue.Queue(1)
+        self._terminated = Queue.Queue(1)
         self.thread = self._start_server()
-        self._started.get(timeout=5)
+        self._started.get(timeout=50)
 
     def _start_server(self):
         proxy = self
 
         class BottleServerAdapter(bottle.ServerAdapter):
+            def __init__(self, _session, _terminated, *args, **kwargs):
+                super(BottleServerAdapter, self).__init__(*args, **kwargs)
+                self._session = _session
+                self._terminated = _terminated
+
             def run(self, app):
+
                 class Server(wsgiref.simple_server.WSGIServer):
                     allow_reuse_address = True
+                    bottle_server = self
 
                     def handle_error(self, request, client_address):
                         pass
 
+                    def serve_forever(self, poll_interval=0.5):
+                        try:
+                            wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
+                            # Once shutdown is called, we need to close the session
+                            self.bottle_server._session.remove()
+                        finally:
+                            # only after we tried to close the session, we can proceed.
+                            self.bottle_server._terminated.put(True)
+
                 class Handler(wsgiref.simple_server.WSGIRequestHandler):
                     def address_string(self):
                         return self.client_address[0]
@@ -77,46 +94,36 @@ class CtxProxy(object):
 
             bottle_app = bottle.Bottle()
             bottle_app.post('/', callback=self._request_handler)
-            bottle_app.delete('/', callback=self._teardown_handler)
             bottle.run(
                 app=bottle_app,
                 host='localhost',
                 port=self.port,
                 quiet=True,
-                server=BottleServerAdapter)
+                server=BottleServerAdapter,
+                _session=proxy.ctx.model.log._session,
+                _terminated=self._terminated)
         thread = threading.Thread(target=serve)
         thread.daemon = True
         thread.start()
         return thread
 
     def close(self):
-        client._close_session(self.socket_url, 10)
         if self.server:
             self.server.shutdown()
+            while self._started.not_empty and self._terminated.empty():
+                # wait for the process of shutdown to complete (i.e. the session is terminated)
+                pass
             self.server.server_close()
 
     def _request_handler(self):
-        return self._handle(self._process)
-
-    def _teardown_handler(self):
-        return self._handle(self._close_session)
-
-    def _handle(self, handler):
         request = bottle.request.body.read()  # pylint: disable=no-member
-        response = handler(request)
+        response = self._process(request)
         return bottle.LocalResponse(
             body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
             status=200,
             headers={'content-type': 'application/json'}
         )
 
-    def _close_session(self, *args, **kwargs):
-        # Since this runs in a daemon thread, we need to close the session each time we process
-        # a request (a new session would be supplied by SQLAlchemy scoped_session).
-        # log mapi is chosen arbitrarily.
-        self.ctx.model.log._session.remove()
-        return {}
-
     def _process(self, request):
         try:
             typed_request = json.loads(request)
@@ -128,7 +135,6 @@ class CtxProxy(object):
                 result_type = 'stop_operation'
             result = {'type': result_type, 'payload': payload}
         except Exception as e:
-            self._close_session()
             traceback_out = StringIO.StringIO()
             traceback.print_exc(file=traceback_out)
             payload = {

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index da6bbb2..23cf9ff 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -387,13 +387,16 @@ def _main():
             for decorate in process_executor.decorate():
                 task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
+            ctx.close()
             messenger.succeeded(tracked_changes=instrument.tracked_changes,
                                 new_instances=instrument.new_instances)
         except BaseException as e:
+            ctx.close()
             messenger.failed(exception=e,
                              tracked_changes=instrument.tracked_changes,
                              new_instances=instrument.new_instances)
         finally:
+            ctx.close()
             instrument.expunge_session()
 
 if __name__ == '__main__':