You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/17 15:26:03 UTC
[1/2] qpid-proton git commit: made Selectable binding not depend on
messenger
Repository: qpid-proton
Updated Branches:
refs/heads/master d3302be0c -> 5a10b7803
made Selectable binding not depend on messenger
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/03ecbaa8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/03ecbaa8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/03ecbaa8
Branch: refs/heads/master
Commit: 03ecbaa8e61eec67346a8974053acfe58ab33b89
Parents: d3302be
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sat Jan 17 07:44:58 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sat Jan 17 07:44:58 2015 -0500
----------------------------------------------------------------------
proton-c/bindings/python/proton/__init__.py | 51 ++++++++----------------
proton-c/include/proton/selectable.h | 3 ++
proton-c/src/messenger/messenger.c | 3 ++
proton-c/src/reactor/acceptor.c | 1 -
proton-c/src/reactor/connection.c | 1 -
proton-c/src/reactor/reactor.c | 10 ++++-
proton-c/src/selectable.c | 17 +++++++-
tests/python/proton_tests/messenger.py | 2 +-
8 files changed, 47 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 98cb9d7..ec1894d 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -748,16 +748,7 @@ first message.
self._check(pn_messenger_rewrite(self._mng, pattern, address))
def selectable(self):
- impl = pn_messenger_selectable(self._mng)
- if impl:
- fd = pn_selectable_get_fd(impl)
- sel = self._selectables.get(fd, None)
- if sel is None:
- sel = Selectable(self, impl)
- self._selectables[fd] = sel
- return sel
- else:
- return None
+ return Selectable.wrap(pn_messenger_selectable(self._mng))
@property
def deadline(self):
@@ -1158,14 +1149,22 @@ class Subscription(object):
_DEFAULT = object()
-class Selectable(object):
+class Selectable(Wrapper):
- def __init__(self, messenger, impl):
- self.messenger = messenger
- self._impl = impl
+ @staticmethod
+ def wrap(impl):
+ if impl is None:
+ return None
+ else:
+ return Selectable(impl)
+
+ def __init__(self, impl):
+ Wrapper.__init__(self, impl, pn_selectable_attachments)
+
+ def _init(self):
+ pass
def fileno(self, fd = _DEFAULT):
- if not self._impl: raise ValueError("selectable freed")
if fd is _DEFAULT:
return pn_selectable_get_fd(self._impl)
elif fd is None:
@@ -1174,27 +1173,22 @@ class Selectable(object):
pn_selectable_set_fd(self._impl, fd)
def _is_reading(self):
- if not self._impl: raise ValueError("selectable freed")
return pn_selectable_is_reading(self._impl)
def _set_reading(self, val):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_set_reading(self._impl, bool(val))
reading = property(_is_reading, _set_reading)
def _is_writing(self):
- if not self._impl: raise ValueError("selectable freed")
return pn_selectable_is_writing(self._impl)
def _set_writing(self, val):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_set_writing(self._impl, bool(val))
writing = property(_is_writing, _set_writing)
def _get_deadline(self):
- if not self._impl: raise ValueError("selectable freed")
tstamp = pn_selectable_get_deadline(self._impl)
if tstamp:
return millis2secs(tstamp)
@@ -1202,29 +1196,23 @@ class Selectable(object):
return None
def _set_deadline(self, deadline):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_set_deadline(self._impl, secs2millis(deadline))
deadline = property(_get_deadline, _set_deadline)
def readable(self):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_readable(self._impl)
def writable(self):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_writable(self._impl)
def expired(self):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_expired(self._impl)
def _is_registered(self):
- if not self._impl: raise ValueError("selectable freed")
return pn_selectable_is_registered(self._impl)
def _set_registered(self, registered):
- if not self._impl: raise ValueError("selectable freed")
pn_selectable_set_registered(self._impl, registered)
registered = property(_is_registered, _set_registered,
@@ -1235,17 +1223,10 @@ indicate whether the fd has been registered or not.
@property
def is_terminal(self):
- if not self._impl: return True
return pn_selectable_is_terminal(self._impl)
- def free(self):
- if self._impl:
- del self.messenger._selectables[self.fileno()]
- pn_selectable_free(self._impl)
- self._impl = None
-
- def __del__(self):
- self.free()
+ def release(self):
+ pn_selectable_release(self._impl)
class DataException(ProtonException):
"""
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/include/proton/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/selectable.h b/proton-c/include/proton/selectable.h
index 615b72b..253f430 100644
--- a/proton-c/include/proton/selectable.h
+++ b/proton-c/include/proton/selectable.h
@@ -96,6 +96,7 @@ PN_EXTERN pn_selectable_t *pn_selectable(void);
PN_EXTERN void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *));
PN_EXTERN void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *));
PN_EXTERN void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_release(pn_selectable_t *sel, void (*release)(pn_selectable_t *));
PN_EXTERN void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *));
PN_EXTERN pn_record_t *pn_selectable_attachments(pn_selectable_t *sel);
@@ -214,6 +215,8 @@ PN_EXTERN bool pn_selectable_is_terminal(pn_selectable_t *selectable);
*/
PN_EXTERN void pn_selectable_terminate(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_release(pn_selectable_t *selectable);
+
/**
* Free a selectable object.
*
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c
index ec3fd3d..5fc894f 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -408,6 +408,7 @@ static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger,
pn_selectable_t *selectable = pn_selectable();
pn_selectable_set_reading(selectable, true);
pn_selectable_on_readable(selectable, pni_listener_readable);
+ pn_selectable_on_release(selectable, pn_selectable_free);
pn_selectable_on_finalize(selectable, pni_listener_finalize);
pn_selectable_set_fd(selectable, socket);
pni_selectable_set_context(selectable, ctx);
@@ -449,6 +450,7 @@ static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
pn_selectable_on_readable(sel, pni_connection_readable);
pn_selectable_on_writable(sel, pni_connection_writable);
pn_selectable_on_expired(sel, pni_connection_expired);
+ pn_selectable_on_release(sel, pn_selectable_free);
pn_selectable_on_finalize(sel, pni_connection_finalize);
pn_selectable_set_fd(ctx->selectable, sock);
pni_selectable_set_context(ctx->selectable, ctx);
@@ -574,6 +576,7 @@ pn_messenger_t *pn_messenger(const char *name)
m->interruptor = pn_selectable();
pn_selectable_set_reading(m->interruptor, true);
pn_selectable_on_readable(m->interruptor, pni_interruptor_readable);
+ pn_selectable_on_release(m->interruptor, pn_selectable_free);
pn_selectable_on_finalize(m->interruptor, pni_interruptor_finalize);
pn_list_add(m->pending, m->interruptor);
m->interrupted = false;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index a9b7dab..4be9543 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -57,7 +57,6 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons
pn_selectable_set_fd(sel, socket);
pn_selectable_on_readable(sel, pni_acceptor_readable);
pn_selectable_on_finalize(sel, pni_acceptor_finalize);
- pni_selectable_set_context(sel, reactor);
pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
pni_record_init_handler(pn_selectable_attachments(sel), handler);
pn_selectable_set_reading(sel, true);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
index 0987e41..86b84e9 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -191,7 +191,6 @@ pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socke
pn_selectable_on_writable(sel, pni_connection_writable);
pn_selectable_on_expired(sel, pni_connection_expired);
pn_selectable_on_finalize(sel, pni_connection_finalize);
- pni_selectable_set_context(sel, reactor);
pn_record_t *record = pn_selectable_attachments(sel);
pn_record_def(record, PN_TRANCTX, PN_OBJECT);
pn_record_set(record, PN_TRANCTX, transport);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index c3c5098..f654c4f 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -99,7 +99,6 @@ static void pni_timer_expired(pn_selectable_t *sel) {
pn_selectable_t *pni_selectable_timer(pn_reactor_t *reactor) {
pn_selectable_t *sel = pn_reactor_selectable(reactor);
pn_selectable_on_expired(sel, pni_timer_expired);
- pni_selectable_set_context(sel, reactor);
pn_record_t *record = pn_selectable_attachments(sel);
pn_record_def(record, 0x1, PN_OBJECT);
pn_timer_t *timer = pn_timer(reactor->collector);
@@ -156,11 +155,18 @@ pn_list_t *pn_reactor_children(pn_reactor_t *reactor) {
return reactor->children;
}
+static void pni_selectable_release(pn_selectable_t *selectable) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(selectable);
+ pn_list_remove(reactor->children, selectable);
+}
+
pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) {
assert(reactor);
pn_selectable_t *sel = pn_selectable();
+ pni_selectable_set_context(sel, reactor);
pn_selector_add(reactor->selector, sel);
pn_list_add(reactor->children, sel);
+ pn_selectable_on_release(sel, pni_selectable_release);
pn_decref(sel);
return sel;
}
@@ -353,7 +359,7 @@ bool pn_reactor_work(pn_reactor_t *reactor, int timeout) {
}
if (pn_selectable_is_terminal(sel)) {
pn_selector_remove(reactor->selector, sel);
- pn_list_remove(reactor->children, sel);
+ pn_selectable_release(sel);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/proton-c/src/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c
index 1f295ff..ec3c193 100644
--- a/proton-c/src/selectable.c
+++ b/proton-c/src/selectable.c
@@ -47,6 +47,7 @@ struct pn_selectable_t {
void (*readable)(pn_selectable_t *);
void (*writable)(pn_selectable_t *);
void (*expired)(pn_selectable_t *);
+ void (*release) (pn_selectable_t *);
void (*finalize)(pn_selectable_t *);
pn_collector_t *collector;
pn_timestamp_t deadline;
@@ -64,6 +65,7 @@ void pn_selectable_initialize(pn_selectable_t *sel)
sel->readable = NULL;
sel->writable = NULL;
sel->expired = NULL;
+ sel->release = NULL;
sel->finalize = NULL;
sel->collector = NULL;
sel->deadline = 0;
@@ -137,6 +139,11 @@ void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectabl
sel->expired = expired;
}
+void pn_selectable_on_release(pn_selectable_t *sel, void (*release)(pn_selectable_t *)) {
+ assert(sel);
+ sel->release = release;
+}
+
void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
assert(sel);
sel->finalize = finalize;
@@ -230,9 +237,17 @@ void pn_selectable_terminate(pn_selectable_t *selectable)
selectable->terminal = true;
}
+void pn_selectable_release(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->release) {
+ selectable->release(selectable);
+ }
+}
+
void pn_selectable_free(pn_selectable_t *selectable)
{
- pn_free(selectable);
+ pn_decref(selectable);
}
static void pni_readable(pn_selectable_t *selectable) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/03ecbaa8/tests/python/proton_tests/messenger.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py
index b22c16d..16e605d 100644
--- a/tests/python/proton_tests/messenger.py
+++ b/tests/python/proton_tests/messenger.py
@@ -958,7 +958,7 @@ class Pump:
for sel in self.selectables[:]:
if sel.is_terminal:
- sel.free()
+ sel.release()
self.selectables.remove(sel)
else:
if sel.reading:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: python bindings for reactor selectables
Posted by rh...@apache.org.
python bindings for reactor selectables
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5a10b780
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5a10b780
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5a10b780
Branch: refs/heads/master
Commit: 5a10b78034907c322508ee24d770b65babe53ea6
Parents: 03ecbaa
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sat Jan 17 09:25:42 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sat Jan 17 09:25:42 2015 -0500
----------------------------------------------------------------------
proton-c/bindings/python/proton/__init__.py | 7 ++++++-
proton-c/bindings/python/proton/reactors.py | 10 +++++++++-
proton-c/include/proton/cproton.i | 1 +
proton-c/src/reactor/reactor.c | 5 +++++
.../main/java/org/apache/qpid/proton/engine/Event.java | 6 +++++-
proton-j/src/main/resources/cengine.py | 3 +++
6 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index ec1894d..936d61d 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -3258,7 +3258,8 @@ wrappers = {
"pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
"pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
"pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
- "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x))
+ "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
+ "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
}
class Collector:
@@ -3353,6 +3354,10 @@ class Event(Wrapper, EventBase):
TRANSPORT_TAIL_CLOSED = EventType(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed")
+ SELECTABLE_READABLE = EventType(PN_SELECTABLE_READABLE, "on_selectable_readable")
+ SELECTABLE_WRITABLE = EventType(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
+ SELECTABLE_EXPIRED = EventType(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
+
@staticmethod
def wrap(impl):
if impl is None:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 2a4fd5a..b70bf7e 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -835,7 +835,7 @@ class Container(object):
def do_work(self, timeout=None):
return self.loop.do_work(timeout)
-from proton import WrappedHandler, _chandler, Connection, secs2millis
+from proton import WrappedHandler, _chandler, Connection, secs2millis, Selectable
from wrapper import Wrapper
from cproton import *
@@ -903,6 +903,14 @@ class Reactor(Wrapper):
pn_decref(impl)
return result
+ def selectable(self):
+ impl = pn_reactor_selectable(self._impl)
+ pn_selectable_collect(impl, pn_reactor_collector(self._impl))
+ return Selectable.wrap(impl)
+
+ def update(self, sel):
+ pn_reactor_update(self._impl, sel._impl)
+
from proton import wrappers as _wrappers
_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index 17a8c37..bf800a0 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -1396,6 +1396,7 @@ typedef unsigned long int uintptr_t;
pn_transport_t *pn_cast_pn_transport(void *x) { return (pn_transport_t *) x; }
pn_reactor_t *pn_cast_pn_reactor(void *x) { return (pn_reactor_t *) x; }
pn_task_t *pn_cast_pn_task(void *x) { return (pn_task_t *) x; }
+ pn_selectable_t *pn_cast_pn_selectable(void *x) { return (pn_selectable_t *) x; }
%}
%include "proton/url.h"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index f654c4f..0c30fce 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -270,6 +270,11 @@ pn_reactor_t *pn_event_reactor(pn_event_t *event) {
pn_record_t *record = pn_connection_attachments(conn);
return pni_record_get_reactor(record);
}
+ case CID_pn_selectable:
+ {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+ return (pn_reactor_t *) pni_selectable_get_context(sel);
+ }
default:
return NULL;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
index 36d1a7b..1970085 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
@@ -67,7 +67,11 @@ public interface Event
TRANSPORT_ERROR,
TRANSPORT_HEAD_CLOSED,
TRANSPORT_TAIL_CLOSED,
- TRANSPORT_CLOSED
+ TRANSPORT_CLOSED,
+
+ SELECTABLE_READABLE,
+ SELECTABLE_WRITABLE,
+ SELECTABLE_EXPIRED
}
Type getType();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a10b780/proton-j/src/main/resources/cengine.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py
index ad70587..5c0de79 100644
--- a/proton-j/src/main/resources/cengine.py
+++ b/proton-j/src/main/resources/cengine.py
@@ -984,6 +984,9 @@ PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR
PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED
PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED
PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED
+PN_SELECTABLE_READABLE = Event.Type.SELECTABLE_READABLE
+PN_SELECTABLE_WRITABLE = Event.Type.SELECTABLE_WRITABLE
+PN_SELECTABLE_EXPIRED = Event.Type.SELECTABLE_EXPIRED
def pn_collector():
return Proton.collector()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org