You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/04/09 23:44:40 UTC
qpid-proton git commit: PROTON-1460: add pn_proactor_disconnect()
Repository: qpid-proton
Updated Branches:
refs/heads/master 2d3a9de8f -> 54923953a
PROTON-1460: add pn_proactor_disconnect()
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/54923953
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/54923953
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/54923953
Branch: refs/heads/master
Commit: 54923953a20a719b399de8c328b582a466690680
Parents: 2d3a9de
Author: Clifford Jansen <cl...@apache.org>
Authored: Sun Apr 9 16:42:43 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Sun Apr 9 16:42:43 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 153 +++++++++++++++++++++++++++++--------
1 file changed, 120 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54923953/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 511b95d..85ee99d 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -63,6 +63,11 @@
// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
// ========================================================================
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire
+// psocket first to avoid deadlock). TODO: revisit the exception and its
+// awkwardness in the code (additional mutex? different type?).
+
typedef pthread_mutex_t pmutex;
static void pmutex_init(pthread_mutex_t *pm){
pthread_mutexattr_t attr;
@@ -87,7 +92,7 @@ typedef enum {
LISTENER_IO,
PROACTOR_TIMER } epoll_type_t;
-// Context to use with epoll.
+// Data to use with epoll.
typedef struct epoll_extended_t {
psocket_t *psocket; // pconnection, listener, or NULL -> proactor
int fd;
@@ -275,8 +280,12 @@ static void pcontext_finalize(pcontext_t* ctx) {
/* common to connection and listener */
typedef struct psocket_t {
pn_proactor_t *proactor;
+ // Next 4 are protected by the proactor mutex
struct psocket_t* next; /* Protected by proactor.mutex */
struct psocket_t* prev; /* Protected by proactor.mutex */
+ bool disconnecting; /* pn_proactor_disconnect */
+ int disconnect_ops; /* ops remaining before disconnect complete */
+ // Remaining protected by the pconnection/listener mutex
int sockfd;
epoll_extended_t epoll_io;
bool is_conn;
@@ -294,8 +303,9 @@ struct pn_proactor_t {
epoll_extended_t epoll_wake;
pn_event_t *cached_event;
pn_event_batch_t batch;
- size_t interrupts; /* total pending interrupts */
+ size_t interrupts; /* total pending interrupts */
size_t deferred_interrupts; /* interrupts for current batch */
+ size_t disconnects_pending; /* unfinished proactor disconnects*/
bool inactive;
bool timer_expired;
bool timer_cancelled;
@@ -395,6 +405,8 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
ps->proactor = p;
ps->next = NULL;
ps->prev = NULL;
+ ps->disconnecting = false;
+ ps->disconnect_ops = 0;
ps->is_conn = is_conn;
ps->closing = false;
ps->sockfd = -1;
@@ -456,7 +468,7 @@ struct pn_listener_t {
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
static void listener_begin_close(pn_listener_t* l);
static void proactor_add(psocket_t *ps);
-static void proactor_remove(psocket_t *ps);
+static bool proactor_remove(psocket_t *ps);
static inline pconnection_t *as_pconnection(psocket_t* ps) {
return ps->is_conn ? (pconnection_t*)ps : NULL;
@@ -586,17 +598,25 @@ static inline bool pconnection_is_final(pconnection_t *pc) {
return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops;
}
+static void pconnection_final_free(pconnection_t *pc) {
+ pn_incref(pc); /* Make sure we don't do a circular free */
+ pn_connection_driver_destroy(&pc->driver);
+ pn_decref(pc);
+ /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
+
// call without lock, but only if pconnection_is_final() is true
static void pconnection_cleanup(pconnection_t *pc) {
if (pc->psocket.sockfd != -1)
close(pc->psocket.sockfd);
- proactor_remove(&pc->psocket);
stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
ptimer_finalize(&pc->timer);
- pn_incref(pc); /* Make sure we don't do a circular free */
- pn_connection_driver_destroy(&pc->driver);
- pn_decref(pc);
- /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+ lock(&pc->context.mutex);
+ bool can_free = proactor_remove(&pc->psocket);
+ unlock(&pc->context.mutex);
+ if (can_free)
+ pconnection_final_free(pc);
+ // else proactor_disconnect logic owns psocket and its final free
}
// Call with lock held or from forced_shutdown
@@ -1078,16 +1098,24 @@ static inline bool listener_can_free(pn_listener_t *l) {
!l->context.wake_ops;
}
+static inline void listener_final_free(pn_listener_t *l) {
+ pcontext_finalize(&l->context);
+ free(l);
+}
+
void pn_listener_free(pn_listener_t *l) {
// TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
- // TODO: handle external call by user
- proactor_remove(&l->psocket);
if (l) {
if (l->collector) pn_collector_free(l->collector);
if (!l->condition) pn_condition_free(l->condition);
if (!l->attachments) pn_free(l->attachments);
- pcontext_finalize(&l->context);
- free(l);
+ lock(&l->context.mutex);
+ bool can_free = proactor_remove(&l->psocket);
+ unlock(&l->context.mutex);
+ if (can_free) {
+ listener_final_free(l);
+ return;
+ } // else... proactor_disconnect logic has assumed ownership
}
}
@@ -1410,27 +1438,45 @@ static void proactor_add(psocket_t *ps) {
unlock(&p->context.mutex);
}
-static void proactor_remove(psocket_t *ps) {
+// call with psocket's mutex held
+// return true if safe for caller to free psocket
+static bool proactor_remove(psocket_t *ps) {
pn_proactor_t *p = ps->proactor;
lock(&p->context.mutex);
bool notify = false;
- if (ps->prev)
- ps->prev->next = ps->next;
- else {
- p->psockets = ps->next;
- ps->next = NULL;
- if (p->psockets)
- p->psockets->prev = NULL;
+ bool can_free = true;
+ if (ps->disconnecting) {
+ // No longer on psockets list
+ if (--ps->disconnect_ops == 0) {
+ if (--p->disconnects_pending == 0 && !p->psockets) {
+ p->inactive = true;
+ notify = wake(&p->context);
+ }
+ }
+ else // procator_disconnect() still processing
+ can_free = false; // this psocket
}
- if (ps->next)
- ps->next->prev = ps->prev;
+ else {
+ // normal case
+ if (ps->prev)
+ ps->prev->next = ps->next;
+ else {
+ p->psockets = ps->next;
+ ps->next = NULL;
+ if (p->psockets)
+ p->psockets->prev = NULL;
+ }
+ if (ps->next)
+ ps->next->prev = ps->prev;
- if (!p->psockets && !p->shutting_down) {
- p->inactive = true;
- notify = wake(&p->context);
+ if (!p->psockets && !p->disconnects_pending && !p->shutting_down) {
+ p->inactive = true;
+ notify = wake(&p->context);
+ }
}
unlock(&p->context.mutex);
if (notify) wake_notify(&p->context);
+ return can_free;
}
static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
@@ -1574,27 +1620,68 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
}
void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
- // TODO: make this boilerplate actually work without deadlocking
lock(&p->context.mutex);
- psocket_t *ps = p->psockets;
+ // Move the whole psockets list into a disconnecting state
+ psocket_t *disconnecting_psockets = p->psockets;
+ p->psockets = NULL;
+ // First pass: mark each psocket as disconnecting and update global pending count.
+ psocket_t *ps = disconnecting_psockets;
while (ps) {
+ ps->disconnecting = true;
+ ps->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
+ p->disconnects_pending++;
+ ps = ps->next;
+ }
+ unlock(&p->context.mutex);
+ if (!disconnecting_psockets)
+ return;
+
+ // Second pass: different locking, close the psockets, free them if !disconnect_ops
+ bool notify = false;
+ for (ps = disconnecting_psockets; ps; ps = ps->next) {
+ bool do_free = false;
+ pmutex *ps_mutex = NULL;
pconnection_t *pc = as_pconnection(ps);
if (pc) {
- if (cond) {
- pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
- }
- pn_connection_driver_close(&pc->driver);
+ ps_mutex = &pc->context.mutex;
+ lock(ps_mutex);
+ if (!ps->closing) {
+ if (cond) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+ }
+ pn_connection_driver_close(&pc->driver);
+ }
} else {
pn_listener_t *l = as_listener(ps);
- if (l) {
+ assert(l);
+ ps_mutex = &l->context.mutex;
+ lock(ps_mutex);
+ if (!ps->closing) {
if (cond) {
pn_condition_copy(pn_listener_condition(l), cond);
}
pn_listener_close(l);
}
}
+
+ lock(&p->context.mutex);
+ if (--ps->disconnect_ops == 0) {
+ do_free = true;
+ if (--p->disconnects_pending == 0 && !p->psockets) {
+ p->inactive = true;
+ notify = wake(&p->context);
+ }
+ }
+ unlock(&p->context.mutex);
+ unlock(ps_mutex);
+
+ if (do_free) {
+ if (pc) pconnection_final_free(pc);
+ else listener_final_free(as_listener(ps));
+ }
}
- unlock(&p->context.mutex);
+ if (notify)
+ wake_notify(&p->context);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org