You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:25:28 UTC

[05/50] [abbrv] qpid-proton git commit: PROTON-1460: epoll - thread safe use of driver on connection disconnect

PROTON-1460: epoll - thread safe use of driver on connection disconnect


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/19f345ac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/19f345ac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/19f345ac

Branch: refs/heads/go1
Commit: 19f345acce3748020c7b2f13b2f0937ed35170ec
Parents: 9c69b7d
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue May 9 13:06:41 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue May 9 13:07:56 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 45 ++++++++++++++++++++++++++++++--------
 1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/19f345ac/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index cea9a68..fae68e9 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -425,6 +425,8 @@ typedef struct pconnection_t {
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool timer_armed;
+  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  pn_condition_t *disconnect_condition;
   ptimer_t timer;  // TODO: review one timerfd per connectoin
   // Following values only changed by (sole) working context:
   uint32_t current_arm;  // active epoll io events
@@ -580,6 +582,8 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   pc->wake_count = 0;
   pc->tick_pending = false;
   pc->timer_armed = false;
+  pc->queued_disconnect = false;
+  pc->disconnect_condition = NULL;
 
   pc->current_arm = 0;
   pc->connected = false;
@@ -609,6 +613,7 @@ static void pconnection_final_free(pconnection_t *pc) {
   if (pc->addrinfo) {
     freeaddrinfo(pc->addrinfo);
   }
+  pn_condition_free(pc->disconnect_condition);
   pn_incref(pc);                /* Make sure we don't do a circular free */
   pn_connection_driver_destroy(&pc->driver);
   pn_decref(pc);
@@ -707,7 +712,7 @@ static inline void pconnection_rearm(pconnection_t *pc) {
 }
 
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->wake_count || pc->tick_pending)
+  if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
@@ -823,6 +828,16 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
  retry:
 
+  if (pc->queued_disconnect) {  // From pn_proactor_disconnect()
+    pc->queued_disconnect = false;
+    if (!pc->context.closing) {
+      if (pc->disconnect_condition) {
+        pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+      }
+      pn_connection_driver_close(&pc->driver);
+    }
+  }
+
   if (pconnection_has_event(pc)) {
     unlock(&pc->context.mutex);
     return &pc->batch;
@@ -1731,16 +1746,29 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
   bool notify = false;
   for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) {
     bool do_free = false;
+    bool ctx_notify = true;
     pmutex *ctx_mutex = NULL;
     pconnection_t *pc = pcontext_pconnection(ctx);
     if (pc) {
       ctx_mutex = &pc->context.mutex;
       lock(ctx_mutex);
       if (!ctx->closing) {
-        if (cond) {
-          pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+        if (ctx->working) {
+          // Must defer
+          pc->queued_disconnect = true;
+          if (cond) {
+            if (!pc->disconnect_condition)
+              pc->disconnect_condition = pn_condition();
+            pn_condition_copy(pc->disconnect_condition, cond);
+          }
+        }
+        else {
+          // No conflicting working context.
+          if (cond) {
+            pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+          }
+          pn_connection_driver_close(&pc->driver);
         }
-        pn_connection_driver_close(&pc->driver);
       }
     } else {
       pn_listener_t *l = pcontext_listener(ctx);
@@ -1758,16 +1786,15 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     lock(&p->context.mutex);
     if (--ctx->disconnect_ops == 0) {
       do_free = true;
-      ctx = NULL;
+      ctx_notify = false;
       if (--p->disconnects_pending == 0 && !p->contexts) {
         p->inactive = true;
         notify = wake(&p->context);
       }
     } else {
       // If initiating the close, wake the pcontext to do the free.
-      if (ctx)
-        if (!wake(ctx))
-          ctx = NULL;  // Wake already pending.
+      if (ctx_notify)
+        ctx_notify = wake(ctx);
     }
     unlock(&p->context.mutex);
     unlock(ctx_mutex);
@@ -1776,7 +1803,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
       if (pc) pconnection_final_free(pc);
       else listener_final_free(pcontext_listener(ctx));
     } else {
-      if (ctx)
+      if (ctx_notify)
         wake_notify(ctx);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org