You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/05/23 22:13:08 UTC
incubator-ariatosca git commit: removed endpoint for session
terminaton. clearing a session is done inhouse
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution c1868c2b4 -> 0f2df7d1b
removed endpoint for session terminaton. clearing a session is done inhouse
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0f2df7d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0f2df7d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0f2df7d1
Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution
Commit: 0f2df7d1b01fc515f578cc02ad9594330a5bbccf
Parents: c1868c2
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed May 24 01:13:04 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed May 24 01:13:04 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/context/operation.py | 5 +++
.../execution_plugin/ctx_proxy/client.py | 34 +++++----------
.../execution_plugin/ctx_proxy/server.py | 46 +++++++++++---------
aria/orchestrator/workflows/executor/process.py | 3 ++
4 files changed, 46 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 52d3ab6..0ce790f 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -102,6 +102,11 @@ class BaseOperationContext(BaseContext):
destroy_session=True,
**kwargs)
+ def close(self):
+ if self._destroy_session:
+ self.model.log._session.remove()
+ self.model.log._engine.dispose()
+
class NodeOperationContext(BaseOperationContext):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/client.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
index fbfff57..f7f56aa 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
@@ -89,29 +89,19 @@ def _process_args(json_prefix, args):
return processed_args
-def _close_session(socket_url, timeout):
- return _client_request(socket_url, [], timeout, 'DELETE')
-
-
def main(args=None):
- try:
- args = _parse_args(args)
- response = _client_request(
- args.socket_url,
- args=_process_args(args.json_arg_prefix, args.args),
- timeout=args.timeout)
- if args.json_output:
- response = json.dumps(response)
- else:
- if not response:
- response = ''
- response = str(response)
- sys.stdout.write(response)
- except BaseException as origial_e:
- try:
- _close_session(args.socket_url, args.timeout)
- except BaseException:
- raise origial_e
+ args = _parse_args(args)
+ response = _client_request(
+ args.socket_url,
+ args=_process_args(args.json_arg_prefix, args.args),
+ timeout=args.timeout)
+ if args.json_output:
+ response = json.dumps(response)
+ else:
+ if not response:
+ response = ''
+ response = str(response)
+ sys.stdout.write(response)
if __name__ == '__main__':
main()
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 6e5b0e3..d9195ff 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -27,7 +27,6 @@ import bottle
from aria import modeling
from .. import exceptions
-from . import client
class CtxProxy(object):
@@ -38,21 +37,39 @@ class CtxProxy(object):
self.port = _get_unused_port()
self.socket_url = 'http://localhost:{0}'.format(self.port)
self.server = None
+ self.bottle_server = None
self._started = Queue.Queue(1)
+ self._terminated = Queue.Queue(1)
self.thread = self._start_server()
- self._started.get(timeout=5)
+ self._started.get(timeout=50)
def _start_server(self):
proxy = self
class BottleServerAdapter(bottle.ServerAdapter):
+ def __init__(self, _session, _terminated, *args, **kwargs):
+ super(BottleServerAdapter, self).__init__(*args, **kwargs)
+ self._session = _session
+ self._terminated = _terminated
+
def run(self, app):
+
class Server(wsgiref.simple_server.WSGIServer):
allow_reuse_address = True
+ bottle_server = self
def handle_error(self, request, client_address):
pass
+ def serve_forever(self, poll_interval=0.5):
+ try:
+ wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
+ # Once shutdown is called, we need to close the session
+ self.bottle_server._session.remove()
+ finally:
+ # only after we tried to close the session, we can proceed.
+ self.bottle_server._terminated.put(True)
+
class Handler(wsgiref.simple_server.WSGIRequestHandler):
def address_string(self):
return self.client_address[0]
@@ -77,46 +94,36 @@ class CtxProxy(object):
bottle_app = bottle.Bottle()
bottle_app.post('/', callback=self._request_handler)
- bottle_app.delete('/', callback=self._teardown_handler)
bottle.run(
app=bottle_app,
host='localhost',
port=self.port,
quiet=True,
- server=BottleServerAdapter)
+ server=BottleServerAdapter,
+ _session=proxy.ctx.model.log._session,
+ _terminated=self._terminated)
thread = threading.Thread(target=serve)
thread.daemon = True
thread.start()
return thread
def close(self):
- client._close_session(self.socket_url, 10)
if self.server:
self.server.shutdown()
+ while self._started.not_empty and self._terminated.empty():
+ # wait for the process of shutdown to complete (i.e. the session is terminated)
+ pass
self.server.server_close()
def _request_handler(self):
- return self._handle(self._process)
-
- def _teardown_handler(self):
- return self._handle(self._close_session)
-
- def _handle(self, handler):
request = bottle.request.body.read() # pylint: disable=no-member
- response = handler(request)
+ response = self._process(request)
return bottle.LocalResponse(
body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
status=200,
headers={'content-type': 'application/json'}
)
- def _close_session(self, *args, **kwargs):
- # Since this runs in a daemon thread, we need to close the session each time we process
- # a request (a new session would be supplied by SQLAlchemy scoped_session).
- # log mapi is chosen arbitrarily.
- self.ctx.model.log._session.remove()
- return {}
-
def _process(self, request):
try:
typed_request = json.loads(request)
@@ -128,7 +135,6 @@ class CtxProxy(object):
result_type = 'stop_operation'
result = {'type': result_type, 'payload': payload}
except Exception as e:
- self._close_session()
traceback_out = StringIO.StringIO()
traceback.print_exc(file=traceback_out)
payload = {
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index da6bbb2..23cf9ff 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -387,13 +387,16 @@ def _main():
for decorate in process_executor.decorate():
task_func = decorate(task_func)
task_func(ctx=ctx, **operation_inputs)
+ ctx.close()
messenger.succeeded(tracked_changes=instrument.tracked_changes,
new_instances=instrument.new_instances)
except BaseException as e:
+ ctx.close()
messenger.failed(exception=e,
tracked_changes=instrument.tracked_changes,
new_instances=instrument.new_instances)
finally:
+ ctx.close()
instrument.expunge_session()
if __name__ == '__main__':