You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2014/02/26 21:04:57 UTC
svn commit: r1572224 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/posix/ proton-j/src/main/resources/
tests/python/proton_tests/
Author: kgiusti
Date: Wed Feb 26 20:04:56 2014
New Revision: 1572224
URL: http://svn.apache.org/r1572224
Log:
PROTON-487: Prevent leaks of endpoints and delivery objects in the python binding
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/bindings/python/python.i
qpid/proton/trunk/proton-c/include/proton/cproton.i
qpid/proton/trunk/proton-c/src/posix/driver.c
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/tests/python/proton_tests/common.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Feb 26 20:04:56 2014
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@ The proton APIs consist of the following
"""
from cproton import *
+import weakref
try:
import uuid
except ImportError:
@@ -2122,30 +2123,41 @@ def obj2dat(obj, dimpl):
d = Data(dimpl)
d.put_object(obj)
-def wrap_connection(conn):
- if not conn: return None
- ctx = pn_connection_get_context(conn)
- if ctx: return ctx
- wrapper = Connection(_conn=conn)
- return wrapper
-
class Connection(Endpoint):
+ @staticmethod
+ def _wrap_connection(c_conn):
+ """Maintain only a single instance of this class for each Connection
+ object that exists in the the C Engine. This is done by storing a (weak)
+ reference to the python instance in the context field of the C object.
+ """
+ if not c_conn: return None
+ py_conn = pn_connection_get_context(c_conn)
+ if py_conn: return py_conn
+ wrapper = Connection(_conn=c_conn)
+ return wrapper
+
def __init__(self, _conn=None):
Endpoint.__init__(self)
if _conn:
self._conn = _conn
else:
self._conn = pn_connection()
- pn_connection_set_context(self._conn, self)
+ pn_connection_set_context(self._conn, self)
self.offered_capabilities = None
self.desired_capabilities = None
self.properties = None
+ self._sessions = set()
def __del__(self):
- if hasattr(self, "_conn"):
+ if hasattr(self, "_conn") and self._conn:
+ # pn_connection_free will release all child sessions in the C Engine, so
+ # free all child python Sessions to avoid dangling references
+ if hasattr(self, "_sessions") and self._sessions:
+ for s in self._sessions:
+ s._release()
+ pn_connection_set_context(self._conn, None)
pn_connection_free(self._conn)
- del self._conn
def _check(self, err):
if err < 0:
@@ -2165,6 +2177,9 @@ class Connection(Endpoint):
pn_connection_collect(self._conn, None)
else:
pn_connection_collect(self._conn, collector._impl)
+ # XXX: we can't let coll go out of scope or the connection will be
+ # pointing to garbage
+ self._collector = collector
def _get_container(self):
return pn_connection_get_container(self._conn)
@@ -2217,17 +2232,17 @@ class Connection(Endpoint):
return pn_connection_state(self._conn)
def session(self):
- return wrap_session(pn_session(self._conn))
+ return Session._wrap_session(pn_session(self._conn))
def session_head(self, mask):
- return wrap_session(pn_session_head(self._conn, mask))
+ return Session._wrap_session(pn_session_head(self._conn, mask))
def link_head(self, mask):
- return wrap_link(pn_link_head(self._conn, mask))
+ return Link._wrap_link(pn_link_head(self._conn, mask))
@property
def work_head(self):
- return wrap_delivery(pn_work_head(self._conn))
+ return Delivery._wrap_delivery(pn_work_head(self._conn))
@property
def error(self):
@@ -2236,26 +2251,47 @@ class Connection(Endpoint):
class SessionException(ProtonException):
pass
-def wrap_session(ssn):
- if ssn is None: return None
- ctx = pn_session_get_context(ssn)
- if ctx:
- return ctx
- else:
- wrapper = Session(ssn)
- pn_session_set_context(ssn, wrapper)
- return wrapper
-
class Session(Endpoint):
+ @staticmethod
+ def _wrap_session(c_ssn):
+ """Maintain only a single instance of this class for each Session object that
+ exists in the C Engine.
+ """
+ if c_ssn is None: return None
+ py_ssn = pn_session_get_context(c_ssn)
+ if py_ssn: return py_ssn
+ wrapper = Session(c_ssn)
+ return wrapper
+
def __init__(self, ssn):
Endpoint.__init__(self)
self._ssn = ssn
-
- def __del__(self):
- if hasattr(self, "_ssn"):
+ pn_session_set_context(self._ssn, self)
+ self._links = set()
+ self.connection._sessions.add(self)
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._ssn:
+ # pn_session_free will release all child links in the C Engine, so free
+ # all child python Links to avoid dangling references
+ for l in self._links:
+ l._release()
+ pn_session_set_context(self._ssn, None)
pn_session_free(self._ssn)
- del self._ssn
+ self._ssn = None
+
+ def free(self):
+ """Release the Session, freeing its resources.
+
+ Call this when you no longer need the session. This will allow the
+ session's resources to be reclaimed. Once called, you should no longer
+ reference the session.
+
+ """
+ self.connection._sessions.remove(self)
+ self._release()
def _get_cond_impl(self):
return pn_session_condition(self._ssn)
@@ -2287,7 +2323,7 @@ class Session(Endpoint):
pn_session_close(self._ssn)
def next(self, mask):
- return wrap_session(pn_session_next(self._ssn, mask))
+ return Session._wrap_session(pn_session_next(self._ssn, mask))
@property
def state(self):
@@ -2295,30 +2331,17 @@ class Session(Endpoint):
@property
def connection(self):
- return wrap_connection(pn_session_connection(self._ssn))
+ return Connection._wrap_connection(pn_session_connection(self._ssn))
def sender(self, name):
- return wrap_link(pn_sender(self._ssn, name))
+ return Link._wrap_link(pn_sender(self._ssn, name))
def receiver(self, name):
- return wrap_link(pn_receiver(self._ssn, name))
+ return Link._wrap_link(pn_receiver(self._ssn, name))
class LinkException(ProtonException):
pass
-def wrap_link(link):
- if link is None: return None
- ctx = pn_link_get_context(link)
- if ctx:
- return ctx
- else:
- if pn_link_is_sender(link):
- wrapper = Sender(link)
- else:
- wrapper = Receiver(link)
- pn_link_set_context(link, wrapper)
- return wrapper
-
class Link(Endpoint):
SND_UNSETTLED = PN_SND_UNSETTLED
@@ -2328,14 +2351,42 @@ class Link(Endpoint):
RCV_FIRST = PN_RCV_FIRST
RCV_SECOND = PN_RCV_SECOND
- def __init__(self, link):
- Endpoint.__init__(self)
- self._link = link
+ @staticmethod
+ def _wrap_link(c_link):
+ """Maintain only a single instance of this class for each Session object that
+ exists in the C Engine.
+ """
+ if c_link is None: return None
+ py_link = pn_link_get_context(c_link)
+ if py_link: return py_link
+ if pn_link_is_sender(c_link):
+ wrapper = Sender(c_link)
+ else:
+ wrapper = Receiver(c_link)
+ return wrapper
- def __del__(self):
- if hasattr(self, "_link"):
+ def __init__(self, c_link):
+ Endpoint.__init__(self)
+ self._link = c_link
+ pn_link_set_context(self._link, self)
+ self._deliveries = set()
+ self.session._links.add(self)
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._link:
+ # pn_link_free will settle all child deliveries in the C Engine, so free
+ # all child python deliveries to avoid dangling references
+ for d in self._deliveries:
+ d._release()
+ pn_link_set_context(self._link, None)
pn_link_free(self._link)
- del self._link
+ self._link = None
+
+ def free(self):
+ """Release the Link, freeing its resources"""
+ self.session._links.remove(self)
+ self._release()
def _check(self, err):
if err < 0:
@@ -2378,14 +2429,14 @@ class Link(Endpoint):
@property
def session(self):
- return wrap_session(pn_link_session(self._link))
+ return Session._wrap_session(pn_link_session(self._link))
def delivery(self, tag):
- return wrap_delivery(pn_delivery(self._link, tag))
+ return Delivery._wrap_delivery(pn_delivery(self._link, tag))
@property
def current(self):
- return wrap_delivery(pn_link_current(self._link))
+ return Delivery._wrap_delivery(pn_link_current(self._link))
def advance(self):
return pn_link_advance(self._link)
@@ -2407,7 +2458,7 @@ class Link(Endpoint):
return pn_link_queued(self._link)
def next(self, mask):
- return wrap_link(pn_link_next(self._link, mask))
+ return Link._wrap_link(pn_link_next(self._link, mask))
@property
def name(self):
@@ -2530,9 +2581,11 @@ class Terminus(object):
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
-
class Sender(Link):
+ def __init__(self, c_link):
+ super(Sender, self).__init__(c_link)
+
def offered(self, n):
pn_link_offered(self._link, n)
@@ -2541,6 +2594,9 @@ class Sender(Link):
class Receiver(Link):
+ def __init__(self, c_link):
+ super(Receiver, self).__init__(c_link)
+
def flow(self, n):
pn_link_flow(self._link, n)
@@ -2558,14 +2614,6 @@ class Receiver(Link):
def draining(self):
return pn_link_draining(self._link)
-def wrap_delivery(dlv):
- if not dlv: return None
- ctx = pn_delivery_get_context(dlv)
- if ctx: return ctx
- wrapper = Delivery(dlv)
- pn_delivery_set_context(dlv, wrapper)
- return wrapper
-
class Disposition(object):
RECEIVED = PN_RECEIVED
@@ -2653,10 +2701,30 @@ class Delivery(object):
RELEASED = Disposition.RELEASED
MODIFIED = Disposition.MODIFIED
+ @staticmethod
+ def _wrap_delivery(c_dlv):
+ """Maintain only a single instance of this class for each Delivery object that
+ exists in the C Engine.
+ """
+ if not c_dlv: return None
+ py_dlv = pn_delivery_get_context(c_dlv)
+ if py_dlv: return py_dlv
+ wrapper = Delivery(c_dlv)
+ return wrapper
+
def __init__(self, dlv):
self._dlv = dlv
+ pn_delivery_set_context(self._dlv, self)
self.local = Disposition(pn_delivery_local(self._dlv), True)
self.remote = Disposition(pn_delivery_remote(self._dlv), False)
+ self.link._deliveries.add(self)
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._dlv:
+ pn_delivery_set_context(self._dlv, None)
+ pn_delivery_settle(self._dlv)
+ self._dlv = None
@property
def tag(self):
@@ -2701,15 +2769,17 @@ class Delivery(object):
return pn_delivery_settled(self._dlv)
def settle(self):
- pn_delivery_settle(self._dlv)
+ """Release the delivery"""
+ self.link._deliveries.remove(self)
+ self._release()
@property
def work_next(self):
- return wrap_delivery(pn_work_next(self._dlv))
+ return Delivery._wrap_delivery(pn_work_next(self._dlv))
@property
def link(self):
- return wrap_link(pn_delivery_link(self._dlv))
+ return Link._wrap_link(pn_delivery_link(self._dlv))
class TransportException(ProtonException):
pass
@@ -2752,7 +2822,15 @@ class Transport(object):
return err
def bind(self, connection):
+ """Assign a connection to the transport"""
self._check(pn_transport_bind(self._trans, connection._conn))
+ # keep python connection from being garbage collected:
+ self._connection = connection
+
+ def unbind(self):
+ """Release the connection"""
+ self._check(pn_transport_unbind(self._trans))
+ self._connection = None
def trace(self, n):
pn_transport_trace(self._trans, n)
@@ -3094,10 +3172,10 @@ class Collector:
else:
tp = None
return Event(type=pn_event_type(event),
- connection=wrap_connection(pn_event_connection(event)),
- session=wrap_session(pn_event_session(event)),
- link=wrap_link(pn_event_link(event)),
- delivery=wrap_delivery(pn_event_delivery(event)),
+ connection=Connection._wrap_connection(pn_event_connection(event)),
+ session=Session._wrap_session(pn_event_session(event)),
+ link=Link._wrap_link(pn_event_link(event)),
+ delivery=Delivery._wrap_delivery(pn_event_delivery(event)),
transport=tp)
def pop(self):
@@ -3139,27 +3217,55 @@ class DriverException(ProtonException):
"""
pass
+class Connector(object):
-def wrap_connector(cxtr):
- if not cxtr: return None
- ctx = pn_connector_context(cxtr)
- if ctx: return ctx
- wrapper = Connector(_cxtr=cxtr)
- pn_connector_set_context(cxtr, wrapper)
- return wrapper
+ @staticmethod
+ def _wrap_connector(c_cxtr, py_driver=None):
+ """Maintain only a single instance of this class for each Connector object that
+ exists in the C Driver.
+ """
+ if not c_cxtr: return None
+ py_cxtr = pn_connector_context(c_cxtr)
+ if py_cxtr: return py_cxtr
+ wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver)
+ return wrapper
-class Connector(object):
- def __init__(self, _cxtr):
+ def __init__(self, _cxtr, _py_driver):
self._cxtr = _cxtr
+ assert(_py_driver)
+ self._driver = weakref.ref(_py_driver)
+ pn_connector_set_context(self._cxtr, self)
+ self._connection = None
+ self._driver()._connectors.add(self)
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._cxtr:
+ pn_connector_set_context(self._cxtr, None)
+ pn_connector_free(self._cxtr)
+ self._cxtr = None
+
+ def free(self):
+ """Release the Connector, freeing its resources.
+
+ Call this when you no longer need the Connector. This will allow the
+ connector's resources to be reclaimed. Once called, you should no longer
+ reference this connector.
+
+ """
+ self.connection = None
+ d = self._driver()
+ if d: d._connectors.remove(self)
+ self._release()
def next(self):
- return wrap_connector(pn_connector_next(self._cxtr))
+ return Connector._wrap_connector(pn_connector_next(self._cxtr))
def process(self):
pn_connector_process(self._cxtr)
def listener(self):
- return wrap_listener(pn_connector_listener(self._cxtr))
+ return Listener._wrap_listener(pn_connector_listener(self._cxtr))
def sasl(self):
## seems easier just to grab the SASL associated with the transport:
@@ -3183,34 +3289,64 @@ class Connector(object):
return pn_connector_closed(self._cxtr)
def _get_connection(self):
- return wrap_connection(pn_connector_connection(self._cxtr))
+ return self._connection
def _set_connection(self, conn):
- pn_connector_set_connection(self._cxtr, conn._conn)
+ if conn:
+ pn_connector_set_connection(self._cxtr, conn._conn)
+ else:
+ pn_connector_set_connection(self._cxtr, None)
+ self._connection = conn
+
connection = property(_get_connection, _set_connection,
doc="""
Associate a Connection with this Connector.
""")
-def wrap_listener(lsnr):
- if not lsnr: return None
- ctx = pn_listener_context(lsnr)
- if ctx: return ctx
- wrapper = Listener(_lsnr=lsnr)
- pn_listener_set_context(lsnr, wrapper)
- return wrapper
-
class Listener(object):
- def __init__(self, _lsnr=None):
+
+ @staticmethod
+ def _wrap_listener(c_lsnr, py_driver=None):
+ """Maintain only a single instance of this class for each Listener object that
+ exists in the C Driver.
+ """
+ if not c_lsnr: return None
+ py_lsnr = pn_listener_context(c_lsnr)
+ if py_lsnr: return py_lsnr
+ wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver)
+ return wrapper
+
+ def __init__(self, _lsnr, _py_driver):
self._lsnr = _lsnr
+ assert(_py_driver)
+ self._driver = weakref.ref(_py_driver)
+ pn_listener_set_context(self._lsnr, self)
+ self._driver()._listeners.add(self)
+
+ def _release(self):
+ """Release the underlying C Engine resource."""
+ if self._lsnr:
+ pn_listener_set_context(self._lsnr, None);
+ pn_listener_free(self._lsnr)
+ self._lsnr = None
+
+ def free(self):
+ """Release the Listener, freeing its resources"""
+ d = self._driver()
+ if d: d._listeners.remove(self)
+ self._release()
def next(self):
- return wrap_listener(pn_listener_next(self._lsnr))
+ return Listener._wrap_listener(pn_listener_next(self._lsnr))
def accept(self):
- cxtr = pn_listener_accept(self._lsnr)
- return wrap_connector(cxtr)
+ d = self._driver()
+ if d:
+ cxtr = pn_listener_accept(self._lsnr)
+ c = Connector._wrap_connector(cxtr, d)
+ return c
+ return None
def close(self):
pn_listener_close(self._lsnr)
@@ -3218,9 +3354,17 @@ class Listener(object):
class Driver(object):
def __init__(self):
self._driver = pn_driver()
+ self._listeners = set()
+ self._connectors = set()
def __del__(self):
- if hasattr(self, "_driver"):
+ # freeing the driver will release all child objects in the C Engine, so
+ # clean up their references in the corresponding Python objects
+ for c in self._connectors:
+ c._release()
+ for l in self._listeners:
+ l._release()
+ if hasattr(self, "_driver") and self._driver:
pn_driver_free(self._driver)
del self._driver
@@ -3235,22 +3379,25 @@ class Driver(object):
return pn_driver_wakeup(self._driver)
def listener(self, host, port):
- return wrap_listener(pn_listener(self._driver, host, port, None))
+ """Construct a listener"""
+ return Listener._wrap_listener(pn_listener(self._driver, host, port, None),
+ self)
def pending_listener(self):
- return wrap_listener(pn_driver_listener(self._driver))
+ return Listener._wrap_listener(pn_driver_listener(self._driver))
def head_listener(self):
- return wrap_listener(pn_listener_head(self._driver))
+ return Listener._wrap_listener(pn_listener_head(self._driver))
def connector(self, host, port):
- return wrap_connector(pn_connector(self._driver, host, port, None))
+ return Connector._wrap_connector(pn_connector(self._driver, host, port, None),
+ self)
def head_connector(self):
- return wrap_connector(pn_connector_head(self._driver))
+ return Connector._wrap_connector(pn_connector_head(self._driver))
def pending_connector(self):
- return wrap_connector(pn_driver_connector(self._driver))
+ return Connector._wrap_connector(pn_driver_connector(self._driver))
__all__ = [
"API_LANGUAGE",
Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Wed Feb 26 20:04:56 2014
@@ -183,19 +183,22 @@ ssize_t pn_transport_input(pn_transport_
%}
%ignore pn_message_data;
-%rename(pn_listener) wrap_pn_listener;
+%rename(pn_listener_set_context) wrap_pn_listener_set_context;
%inline {
- pn_listener_t *wrap_pn_listener(pn_driver_t *driver, const char *host, const char *port, PyObject *context) {
- Py_XINCREF(context);
- return pn_listener(driver, host, port, context);
+ void wrap_pn_listener_set_context(pn_listener_t *l, PyObject *context) {
+ // don't incref context: we 'borrow' the reference - prevents
+ // reference loops. Should be safe as the Python object must
+ // outlive the C object.
+ pn_listener_set_context(l, context);
}
}
-%ignore pn_listener;
+%ignore pn_listener_set_context;
%rename(pn_listener_context) wrap_pn_listener_context;
%inline {
PyObject *wrap_pn_listener_context(pn_listener_t *l) {
PyObject *result = (PyObject *) pn_listener_context(l);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -206,39 +209,22 @@ ssize_t pn_transport_input(pn_transport_
}
%ignore pn_listener_context;
-%rename(pn_listener_set_context) wrap_pn_listener_set_context;
-%inline {
- void wrap_pn_listener_set_context(pn_listener_t *l, PyObject *context) {
- Py_XDECREF((PyObject *)pn_listener_context(l));
- Py_XINCREF(context);
- pn_listener_set_context(l, context);
- }
-}
-%ignore pn_listener_set_context;
-
-%rename(pn_listener_free) wrap_pn_listener_free;
-%inline %{
- void wrap_pn_listener_free(pn_listener_t *l) {
- PyObject *obj = (PyObject *) pn_listener_context(l);
- Py_XDECREF(obj);
- pn_listener_free(l);
- }
-%}
-%ignore pn_listener_free;
-
-%rename(pn_connector) wrap_pn_connector;
+%rename(pn_connector_set_context) wrap_pn_connector_set_context;
%inline {
- pn_connector_t *wrap_pn_connector(pn_driver_t *driver, const char *host, const char *port, PyObject *context) {
- Py_XINCREF(context);
- return pn_connector(driver, host, port, context);
+ void wrap_pn_connector_set_context(pn_connector_t *c, PyObject *context) {
+ // don't incref context: we 'borrow' the reference - prevents
+ // reference loops. Should be safe as the Python object must
+ // outlive the C object.
+ pn_connector_set_context(c, context);
}
}
-%ignore pn_connector;
+%ignore pn_connector_set_context;
%rename(pn_connector_context) wrap_pn_connector_context;
%inline {
PyObject *wrap_pn_connector_context(pn_connector_t *c) {
PyObject *result = (PyObject *) pn_connector_context(c);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -249,30 +235,11 @@ ssize_t pn_transport_input(pn_transport_
}
%ignore pn_connector_context;
-%rename(pn_connector_set_context) wrap_pn_connector_set_context;
-%inline {
- void wrap_pn_connector_set_context(pn_connector_t *ctor, PyObject *context) {
- Py_XDECREF((PyObject *)pn_connector_context(ctor));
- Py_XINCREF(context);
- pn_connector_set_context(ctor, context);
- }
-}
-%ignore pn_connector_set_context;
-
-%rename(pn_connector_free) wrap_pn_connector_free;
-%inline %{
- void wrap_pn_connector_free(pn_connector_t *c) {
- PyObject *obj = (PyObject *) pn_connector_context(c);
- Py_XDECREF(obj);
- pn_connector_free(c);
- }
-%}
-%ignore pn_connector_free;
-
%rename(pn_connection_get_context) wrap_pn_connection_get_context;
%inline {
PyObject *wrap_pn_connection_get_context(pn_connection_t *c) {
PyObject *result = (PyObject *) pn_connection_get_context(c);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -286,27 +253,17 @@ ssize_t pn_transport_input(pn_transport_
%rename(pn_connection_set_context) wrap_pn_connection_set_context;
%inline {
void wrap_pn_connection_set_context(pn_connection_t *c, PyObject *context) {
- Py_XDECREF((PyObject *)pn_connection_get_context(c));
- Py_XINCREF(context);
+ // don't incref context: we 'borrow' the reference
pn_connection_set_context(c, context);
}
}
%ignore pn_connection_set_context;
-%rename(pn_connection_free) wrap_pn_connection_free;
-%inline %{
- void wrap_pn_connection_free(pn_connection_t *c) {
- PyObject *obj = (PyObject *) pn_connection_get_context(c);
- Py_XDECREF(obj);
- pn_connection_free(c);
- }
-%}
-%ignore pn_connection_free;
-
%rename(pn_session_get_context) wrap_pn_session_get_context;
%inline {
PyObject *wrap_pn_session_get_context(pn_session_t *s) {
PyObject *result = (PyObject *) pn_session_get_context(s);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -320,27 +277,17 @@ ssize_t pn_transport_input(pn_transport_
%rename(pn_session_set_context) wrap_pn_session_set_context;
%inline {
void wrap_pn_session_set_context(pn_session_t *s, PyObject *context) {
- Py_XDECREF((PyObject *)pn_session_get_context(s));
- Py_XINCREF(context);
+ // don't incref context: we 'borrow' the reference
pn_session_set_context(s, context);
}
}
%ignore pn_session_set_context;
-%rename(pn_session_free) wrap_pn_session_free;
-%inline %{
- void wrap_pn_session_free(pn_session_t *s) {
- PyObject *obj = (PyObject *) pn_session_get_context(s);
- Py_XDECREF(obj);
- pn_session_free(s);
- }
-%}
-%ignore pn_session_free;
-
%rename(pn_link_get_context) wrap_pn_link_get_context;
%inline {
PyObject *wrap_pn_link_get_context(pn_link_t *l) {
PyObject *result = (PyObject *) pn_link_get_context(l);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -354,27 +301,17 @@ ssize_t pn_transport_input(pn_transport_
%rename(pn_link_set_context) wrap_pn_link_set_context;
%inline {
void wrap_pn_link_set_context(pn_link_t *l, PyObject *context) {
- Py_XDECREF((PyObject *)pn_link_get_context(l));
- Py_XINCREF(context);
+ // don't incref context: we 'borrow' the reference
pn_link_set_context(l, context);
}
}
%ignore pn_link_set_context;
-%rename(pn_link_free) wrap_pn_link_free;
-%inline %{
- void wrap_pn_link_free(pn_link_t *l) {
- PyObject *obj = (PyObject *) pn_link_get_context(l);
- Py_XDECREF(obj);
- pn_link_free(l);
- }
-%}
-%ignore pn_link_free;
-
%rename(pn_delivery_get_context) wrap_pn_delivery_get_context;
%inline {
PyObject *wrap_pn_delivery_get_context(pn_delivery_t *d) {
PyObject *result = (PyObject *) pn_delivery_get_context(d);
+ // incref the returned context, as the caller expects this
if (result) {
Py_INCREF(result);
return result;
@@ -388,23 +325,12 @@ ssize_t pn_transport_input(pn_transport_
%rename(pn_delivery_set_context) wrap_pn_delivery_set_context;
%inline {
void wrap_pn_delivery_set_context(pn_delivery_t *d, PyObject *context) {
- Py_XDECREF((PyObject *)pn_delivery_get_context(d));
- Py_XINCREF(context);
+ // don't incref context: we 'borrow' the reference
pn_delivery_set_context(d, context);
}
}
%ignore pn_delivery_set_context;
-%rename(pn_delivery_settle) wrap_pn_delivery_settle;
-%inline %{
- void wrap_pn_delivery_settle(pn_delivery_t *d) {
- PyObject *obj = (PyObject *) pn_delivery_get_context(d);
- Py_XDECREF(obj);
- pn_delivery_settle(d);
- }
-%}
-%ignore pn_delivery_settle;
-
ssize_t pn_data_decode(pn_data_t *data, char *STRING, size_t LENGTH);
%ignore pn_data_decode;
Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Wed Feb 26 20:04:56 2014
@@ -1105,6 +1105,12 @@ typedef long long int int64_t;
listener != NULL;
}
+%contract pn_listener_set_context(pn_listener_t *listener, void *context)
+{
+ require:
+ listener != NULL;
+}
+
%contract pn_listener_close(pn_listener_t *listener)
{
require:
@@ -1173,7 +1179,6 @@ typedef long long int int64_t;
{
require:
ctor != NULL;
- connection != NULL;
}
%contract pn_connector_context(pn_connector_t *connector)
Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Wed Feb 26 20:04:56 2014
@@ -37,6 +37,7 @@
#include <proton/sasl.h>
#include <proton/ssl.h>
#include <proton/util.h>
+#include <proton/object.h>
#include "../util.h"
#include "../platform.h"
#include "../ssl/ssl-internal.h"
@@ -449,8 +450,15 @@ pn_transport_t *pn_connector_transport(p
void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
{
if (!ctor) return;
+ if (ctor->connection) {
+ pn_decref(ctor->connection);
+ pn_transport_unbind(ctor->transport);
+ }
ctor->connection = connection;
- pn_transport_bind(ctor->transport, connection);
+ if (ctor->connection) {
+ pn_incref(ctor->connection);
+ pn_transport_bind(ctor->transport, connection);
+ }
if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace);
}
@@ -503,9 +511,10 @@ void pn_connector_free(pn_connector_t *c
if (!ctor) return;
if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor);
- ctor->connection = NULL;
pn_transport_free(ctor->transport);
ctor->transport = NULL;
+ if (ctor->connection) pn_decref(ctor->connection);
+ ctor->connection = NULL;
free(ctor);
}
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Wed Feb 26 20:04:56 2014
@@ -324,6 +324,9 @@ def pn_sender(ssn, name):
def pn_receiver(ssn, name):
return wrap(ssn.impl.receiver(name), pn_link_wrapper)
+def pn_session_free(ssn):
+ ssn.impl = None
+
TERMINUS_TYPES_J2P = {
Source: PN_SOURCE,
Target: PN_TARGET,
@@ -648,6 +651,9 @@ def pn_link_advance(link):
def pn_link_current(link):
return wrap(link.impl.current(), pn_delivery_wrapper)
+def pn_link_free(link):
+ link.impl = None
+
def pn_work_head(conn):
return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper)
@@ -831,6 +837,9 @@ def pn_delivery_update(dlv, state):
dlv.local.type = state
dlv.impl.disposition(dlv.local.encode())
+def pn_delivery_link(dlv):
+ return wrap(dlv.impl.getLink(), pn_link_wrapper)
+
def pn_delivery_settle(dlv):
dlv.impl.settle()
Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Wed Feb 26 20:04:56 2014
@@ -93,7 +93,7 @@ def isSSLPresent():
# SSL libraries not installed
return False
-class Test:
+class Test(object):
def __init__(self, name):
self.name = name
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1572224&r1=1572223&r2=1572224&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Wed Feb 26 20:04:56 2014
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,11 +17,15 @@
# under the License.
#
-import os, common
+import os, common, gc
from time import time, sleep
from proton import *
from common import pump
+# older versions of gc do not provide the garbage list
+if not hasattr(gc, "garbage"):
+ gc.garbage=[]
+
# future test areas
# + different permutations of setup
# - creating deliveries and calling input/output before opening the session/link
@@ -84,7 +88,11 @@ class Test(common.Test):
return snd, rcv
def cleanup(self):
- pass
+ # release resources created by this class
+ for w in self._wires:
+ w[0]._transport = None
+ w[2]._transport = None
+ self._wires = []
def pump(self, buffer_size=OUTPUT_SIZE):
for c1, t1, c2, t2 in self._wires:
@@ -93,10 +101,19 @@ class Test(common.Test):
class ConnectionTest(Test):
def setup(self):
+ gc.enable()
self.c1, self.c2 = self.connection()
+ def cleanup(self):
+ # release resources created by this class
+ super(ConnectionTest, self).cleanup()
+ self.c1 = None
+ self.c2 = None
+
def teardown(self):
self.cleanup()
+ gc.collect()
+ assert not gc.garbage
def test_open_close(self):
assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
@@ -203,16 +220,43 @@ class ConnectionTest(Test):
self.pump()
assert self.c2._transport.remote_channel_max == value, (self.c2._transport.remote_channel_max, value)
+ def test_cleanup(self):
+ self.c1.open()
+ self.c2.open()
+ self.pump()
+ assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ t1 = self.c1._transport
+ t2 = self.c2._transport
+ c2 = self.c2
+ self.c1.close()
+ # release all references to C1, except that held by the transport
+ self.cleanup()
+ gc.collect()
+ # transport should flush last state from C1:
+ pump(t1, t2)
+ assert c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
class SessionTest(Test):
def setup(self):
+ gc.enable()
self.c1, self.c2 = self.connection()
self.ssn = self.c1.session()
self.c1.open()
self.c2.open()
+ def cleanup(self):
+ # release resources created by this class
+ super(SessionTest, self).cleanup()
+ self.c1 = None
+ self.c2 = None
+ self.ssn = None
+
def teardown(self):
self.cleanup()
+ gc.collect()
+ assert not gc.garbage
def test_open_close(self):
assert self.ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
@@ -314,13 +358,38 @@ class SessionTest(Test):
rcond = ssn.remote_condition
assert rcond == cond, (rcond, cond)
+ def test_cleanup(self):
+ snd, rcv = self.link("test-link")
+ snd.open()
+ rcv.open()
+ self.pump()
+ snd_ssn = snd.session
+ rcv_ssn = rcv.session
+ assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ self.ssn = None
+ snd_ssn.close()
+ snd_ssn.free()
+ del snd_ssn
+ gc.collect()
+ self.pump()
+ assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
+
class LinkTest(Test):
def setup(self):
+ gc.enable()
self.snd, self.rcv = self.link("test-link")
+ def cleanup(self):
+ # release resources created by this class
+ super(LinkTest, self).cleanup()
+ self.snd = None
+ self.rcv = None
+
def teardown(self):
self.cleanup()
+ gc.collect()
+ assert not gc.garbage
def test_open_close(self):
assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
@@ -535,6 +604,18 @@ class LinkTest(Test):
assert self.snd.remote_rcv_settle_mode == Link.RCV_SECOND
assert self.rcv.remote_snd_settle_mode == Link.SND_UNSETTLED
+ def test_cleanup(self):
+ snd, rcv = self.link("test-link")
+ snd.open()
+ rcv.open()
+ self.pump()
+ assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ snd.close()
+ snd.free()
+ del snd
+ gc.collect()
+ self.pump()
+ assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
class TerminusConfig:
@@ -574,6 +655,7 @@ class TerminusConfig:
class TransferTest(Test):
def setup(self):
+ gc.enable()
self.snd, self.rcv = self.link("test-link")
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
@@ -581,8 +663,18 @@ class TransferTest(Test):
self.rcv.open()
self.pump()
+ def cleanup(self):
+ # release resources created by this class
+ super(TransferTest, self).cleanup()
+ self.c1 = None
+ self.c2 = None
+ self.snd = None
+ self.rcv = None
+
def teardown(self):
self.cleanup()
+ gc.collect()
+ assert not gc.garbage
def test_work_queue(self):
assert self.c1.work_head is None
@@ -667,11 +759,11 @@ class TransferTest(Test):
assert sd.updated
sd.update(Delivery.ACCEPTED)
- sd.settle()
self.pump()
assert sd.local_state == rd.remote_state == Delivery.ACCEPTED
+ sd.settle()
def test_delivery_id_ordering(self):
self.rcv.flow(1024)
@@ -710,10 +802,11 @@ class TransferTest(Test):
#handle all disposition changes to sent messages
d = self.c1.work_head
while d:
+ next_d = d.work_next
if d.updated:
d.update(Delivery.ACCEPTED)
d.settle()
- d = d.work_next
+ d = next_d
#submit some more deliveries
for m in range(1450, 1500):
@@ -737,12 +830,47 @@ class TransferTest(Test):
rd.update(Delivery.ACCEPTED)
rd.settle()
+ def test_cleanup(self):
+ self.rcv.flow(10)
+ self.pump()
+
+ for x in range(10):
+ self.snd.delivery("tag%d" % x)
+ msg = "this is a test"
+ n = self.snd.send(msg)
+ assert n == len(msg)
+ assert self.snd.advance()
+ self.snd.close()
+ self.snd.free()
+ self.snd = None
+ gc.collect()
+
+ self.pump()
+
+ for x in range(10):
+ rd = self.rcv.current
+ assert rd is not None
+ assert rd.tag == "tag%d" % x
+ rmsg = self.rcv.recv(1024)
+ assert self.rcv.advance()
+ assert rmsg == msg
+ # close of snd should've settled:
+ assert rd.settled
+ rd.settle()
class MaxFrameTransferTest(Test):
def setup(self):
pass
+ def cleanup(self):
+ # release resources created by this class
+ super(MaxFrameTransferTest, self).cleanup()
+ self.c1 = None
+ self.c2 = None
+ self.snd = None
+ self.rcv = None
+
def teardown(self):
self.cleanup()
@@ -860,6 +988,14 @@ class IdleTimeoutTest(Test):
def setup(self):
pass
+ def cleanup(self):
+ # release resources created by this class
+ super(IdleTimeoutTest, self).cleanup()
+ self.snd = None
+ self.rcv = None
+ self.c1 = None
+ self.c2 = None
+
def teardown(self):
self.cleanup()
@@ -961,6 +1097,15 @@ class CreditTest(Test):
self.rcv.open()
self.pump()
+ def cleanup(self):
+ # release resources created by this class
+ super(CreditTest, self).cleanup()
+ self.c1 = None
+ self.snd = None
+ self.c2 = None
+ self.rcv2 = None
+ self.snd2 = None
+
def teardown(self):
self.cleanup()
@@ -1475,6 +1620,15 @@ class SettlementTest(Test):
self.rcv.open()
self.pump()
+ def cleanup(self):
+ # release resources created by this class
+ super(SettlementTest, self).cleanup()
+ self.c1 = None
+ self.snd = None
+ self.c2 = None
+ self.rcv2 = None
+ self.snd2 = None
+
def teardown(self):
self.cleanup()
@@ -1586,6 +1740,12 @@ class PipelineTest(Test):
def setup(self):
self.c1, self.c2 = self.connection()
+ def cleanup(self):
+ # release resources created by this class
+ super(PipelineTest, self).cleanup()
+ self.c1 = None
+ self.c2 = None
+
def teardown(self):
self.cleanup()
@@ -1911,6 +2071,9 @@ class DeliveryTest(Test):
class EventTest(Test):
+ def teardown(self):
+ self.cleanup()
+
def list(self, collector):
result = []
while True:
@@ -1983,8 +2146,6 @@ class EventTest(Test):
self.expect(coll, Event.LINK_STATE, Event.DELIVERY)
def testDeliveryEventsDisp(self):
- # XXX: we can't let coll go out of scope or the connection will be
- # pointing to garbage
snd, rcv, coll = self.testFlowEvents()
snd.open()
dlv = snd.delivery("delivery")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org