You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:25:28 UTC
[05/50] [abbrv] qpid-proton git commit: PROTON-1460: epoll - thread
safe use of driver on connection disconnect
PROTON-1460: epoll - thread safe use of driver on connection disconnect
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/19f345ac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/19f345ac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/19f345ac
Branch: refs/heads/go1
Commit: 19f345acce3748020c7b2f13b2f0937ed35170ec
Parents: 9c69b7d
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue May 9 13:06:41 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue May 9 13:07:56 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 45 ++++++++++++++++++++++++++++++--------
1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/19f345ac/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index cea9a68..fae68e9 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -425,6 +425,8 @@ typedef struct pconnection_t {
bool server; /* accept, not connect */
bool tick_pending;
bool timer_armed;
+ bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
+ pn_condition_t *disconnect_condition;
ptimer_t timer; // TODO: review one timerfd per connectoin
// Following values only changed by (sole) working context:
uint32_t current_arm; // active epoll io events
@@ -580,6 +582,8 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
pc->wake_count = 0;
pc->tick_pending = false;
pc->timer_armed = false;
+ pc->queued_disconnect = false;
+ pc->disconnect_condition = NULL;
pc->current_arm = 0;
pc->connected = false;
@@ -609,6 +613,7 @@ static void pconnection_final_free(pconnection_t *pc) {
if (pc->addrinfo) {
freeaddrinfo(pc->addrinfo);
}
+ pn_condition_free(pc->disconnect_condition);
pn_incref(pc); /* Make sure we don't do a circular free */
pn_connection_driver_destroy(&pc->driver);
pn_decref(pc);
@@ -707,7 +712,7 @@ static inline void pconnection_rearm(pconnection_t *pc) {
}
static inline bool pconnection_work_pending(pconnection_t *pc) {
- if (pc->new_events || pc->wake_count || pc->tick_pending)
+ if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
return true;
if (!pc->read_blocked && !pconnection_rclosed(pc))
return true;
@@ -823,6 +828,16 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
retry:
+ if (pc->queued_disconnect) { // From pn_proactor_disconnect()
+ pc->queued_disconnect = false;
+ if (!pc->context.closing) {
+ if (pc->disconnect_condition) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+ }
+ pn_connection_driver_close(&pc->driver);
+ }
+ }
+
if (pconnection_has_event(pc)) {
unlock(&pc->context.mutex);
return &pc->batch;
@@ -1731,16 +1746,29 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
bool notify = false;
for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) {
bool do_free = false;
+ bool ctx_notify = true;
pmutex *ctx_mutex = NULL;
pconnection_t *pc = pcontext_pconnection(ctx);
if (pc) {
ctx_mutex = &pc->context.mutex;
lock(ctx_mutex);
if (!ctx->closing) {
- if (cond) {
- pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+ if (ctx->working) {
+ // Must defer
+ pc->queued_disconnect = true;
+ if (cond) {
+ if (!pc->disconnect_condition)
+ pc->disconnect_condition = pn_condition();
+ pn_condition_copy(pc->disconnect_condition, cond);
+ }
+ }
+ else {
+ // No conflicting working context.
+ if (cond) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+ }
+ pn_connection_driver_close(&pc->driver);
}
- pn_connection_driver_close(&pc->driver);
}
} else {
pn_listener_t *l = pcontext_listener(ctx);
@@ -1758,16 +1786,15 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
lock(&p->context.mutex);
if (--ctx->disconnect_ops == 0) {
do_free = true;
- ctx = NULL;
+ ctx_notify = false;
if (--p->disconnects_pending == 0 && !p->contexts) {
p->inactive = true;
notify = wake(&p->context);
}
} else {
// If initiating the close, wake the pcontext to do the free.
- if (ctx)
- if (!wake(ctx))
- ctx = NULL; // Wake already pending.
+ if (ctx_notify)
+ ctx_notify = wake(ctx);
}
unlock(&p->context.mutex);
unlock(ctx_mutex);
@@ -1776,7 +1803,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
if (pc) pconnection_final_free(pc);
else listener_final_free(pcontext_listener(ctx));
} else {
- if (ctx)
+ if (ctx_notify)
wake_notify(ctx);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org