You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2015/01/21 10:24:53 UTC
qpid-proton git commit: PROTON-800: proton-c reactor fixes for
Windows, destructor ordering and selectables handling
Repository: qpid-proton
Updated Branches:
refs/heads/master e2be27cef -> 6df8ad351
PROTON-800: proton-c reactor fixes for Windows, destructor ordering and selectables handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6df8ad35
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6df8ad35
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6df8ad35
Branch: refs/heads/master
Commit: 6df8ad3511e81b3b674e857632af35dd2e3e5883
Parents: e2be27c
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed Jan 21 01:23:47 2015 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed Jan 21 01:23:47 2015 -0800
----------------------------------------------------------------------
proton-c/src/reactor/reactor.c | 4 +-
proton-c/src/windows/iocp.c | 11 +++++-
proton-c/src/windows/iocp.h | 2 +
proton-c/src/windows/selector.c | 72 ++++++++++++++++++++----------------
4 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index bdee2de..b9346d2 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -76,12 +76,12 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
static void pn_reactor_finalize(pn_reactor_t *reactor) {
pn_decref(reactor->attachments);
- pn_decref(reactor->selector);
- pn_decref(reactor->io);
pn_decref(reactor->collector);
pn_decref(reactor->handler);
pn_decref(reactor->children);
pn_decref(reactor->timer);
+ pn_decref(reactor->selector);
+ pn_decref(reactor->io);
}
#define pn_reactor_hashcode NULL
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c
index 6acff41..3feec01 100644
--- a/proton-c/src/windows/iocp.c
+++ b/proton-c/src/windows/iocp.c
@@ -733,7 +733,6 @@ static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
{
static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
- assert (s != INVALID_SOCKET);
iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
assert(iocpd);
iocpd->socket = s;
@@ -749,6 +748,7 @@ static bool is_listener_socket(pn_socket_t s)
}
iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
+ assert (s != INVALID_SOCKET);
assert(!pni_iocpdesc_map_get(iocp, s));
bool listening = is_listener_socket(s);
iocpdesc_t *iocpd = pni_iocpdesc(s);
@@ -767,6 +767,15 @@ iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
return iocpd;
}
+iocpdesc_t *pni_deadline_desc(iocp_t *iocp) {
+ // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or
+ // zombie list. Selector responsible to free/decref object.
+ iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET);
+ iocpd->iocp = iocp;
+ iocpd->deadline_desc = true;
+ return iocpd;
+}
+
// === Fast lookup of a socket's iocpdesc_t
iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.h
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h
index 91ded50..adccaab 100644
--- a/proton-c/src/windows/iocp.h
+++ b/proton-c/src/windows/iocp.h
@@ -74,6 +74,7 @@ struct iocpdesc_t {
bool closing; // pn_close called
bool read_closed; // EOF or read error
bool write_closed; // shutdown sent or write error
+ bool deadline_desc; // Socket-less deadline descriptor for selectors
pn_selector_t *selector;
pn_selectable_t *selectable;
int events;
@@ -104,6 +105,7 @@ struct write_result_t {
iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
+iocpdesc_t *pni_deadline_desc(iocp_t *);
void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
void pni_iocpdesc_start(iocpdesc_t *iocpd);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c
index a59dc9c..6bd8d8c 100644
--- a/proton-c/src/windows/selector.c
+++ b/proton-c/src/windows/selector.c
@@ -44,8 +44,6 @@ static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
struct pn_selector_t {
iocp_t *iocp;
- pn_timestamp_t *deadlines;
- size_t capacity;
pn_list_t *selectables;
pn_list_t *iocp_descriptors;
size_t current;
@@ -62,8 +60,6 @@ void pn_selector_initialize(void *obj)
{
pn_selector_t *selector = (pn_selector_t *) obj;
selector->iocp = NULL;
- selector->deadlines = NULL;
- selector->capacity = 0;
selector->selectables = pn_list(PN_WEAKREF, 0);
selector->iocp_descriptors = pn_list(PN_OBJECT, 0);
selector->current = 0;
@@ -79,7 +75,6 @@ void pn_selector_initialize(void *obj)
void pn_selector_finalize(void *obj)
{
pn_selector_t *selector = (pn_selector_t *) obj;
- free(selector->deadlines);
pn_free(selector->selectables);
pn_free(selector->iocp_descriptors);
pn_error_free(selector->error);
@@ -110,34 +105,13 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
assert(selectable);
assert(pni_selectable_get_index(selectable) < 0);
pn_socket_t sock = pn_selectable_get_fd(selectable);
-
iocpdesc_t *iocpd = NULL;
- if (sock != INVALID_SOCKET) {
- iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
- if (!iocpd) {
- // Socket created outside proton. Hook it up to iocp.
- iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
- pni_iocpdesc_start(iocpd);
- } else {
- assert(iocpd->iocp == selector->iocp);
- }
- }
if (pni_selectable_get_index(selectable) < 0) {
pn_list_add(selector->selectables, selectable);
- pn_list_add(selector->iocp_descriptors, iocpd);
+ pn_list_add(selector->iocp_descriptors, NULL);
size_t size = pn_list_size(selector->selectables);
-
- if (selector->capacity < size) {
- selector->deadlines = (pn_timestamp_t *) realloc(selector->deadlines, size*sizeof(pn_timestamp_t));
- selector->capacity = size;
- }
-
pni_selectable_set_index(selectable, size - 1);
- if (iocpd) {
- iocpd->selector = selector;
- iocpd->selectable = selectable;
- }
}
pn_selector_update(selector, selectable);
@@ -145,12 +119,48 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
{
+ // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between
+ // update calls. If a selectable without a valid socket has a deadline, we need
+ // a dummy iocpdesc_t to participate in the deadlines list.
int idx = pni_selectable_get_index(selectable);
assert(idx >= 0);
- selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
-
+ pn_timestamp_t deadline = pn_selectable_get_deadline(selectable);
pn_socket_t sock = pn_selectable_get_fd(selectable);
iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+
+ if (!iocpd && deadline && sock == PN_INVALID_SOCKET) {
+ iocpd = pni_deadline_desc(selector->iocp);
+ assert(iocpd);
+ pn_list_set(selector->iocp_descriptors, idx, iocpd);
+ pn_decref(iocpd); // life is solely tied to iocp_descriptors list
+ iocpd->selector = selector;
+ iocpd->selectable = selectable;
+ }
+ else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) {
+ // Switching to a real socket. Stop using a deadline descriptor.
+ deadlines_update(iocpd, 0);
+ // decref descriptor in list and pick up a real iocpd below
+ pn_list_set(selector->iocp_descriptors, idx, NULL);
+ iocpd = NULL;
+ }
+
+ // The selectables socket may be set long after it has been added
+ if (!iocpd && sock != PN_INVALID_SOCKET) {
+ iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+ if (!iocpd) {
+ // Socket created outside proton. Hook it up to iocp.
+ iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+ assert(iocpd);
+ if (iocpd)
+ pni_iocpdesc_start(iocpd);
+ }
+ if (iocpd) {
+ pn_list_set(selector->iocp_descriptors, idx, iocpd);
+ iocpd->selector = selector;
+ iocpd->selectable = selectable;
+ }
+ }
+
if (iocpd) {
assert(sock == iocpd->socket || iocpd->closing);
int interests = 0;
@@ -160,11 +170,11 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
if (pn_selectable_is_writing(selectable)) {
interests |= PN_WRITABLE;
}
- if (selector->deadlines[idx]) {
+ if (deadline) {
interests |= PN_EXPIRED;
}
interests_update(iocpd, interests);
- deadlines_update(iocpd, selector->deadlines[idx]);
+ deadlines_update(iocpd, deadline);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org