You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/04/09 23:44:40 UTC

qpid-proton git commit: PROTON-1460: add pn_proactor_disconnect()

Repository: qpid-proton
Updated Branches:
  refs/heads/master 2d3a9de8f -> 54923953a


PROTON-1460: add pn_proactor_disconnect()


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/54923953
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/54923953
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/54923953

Branch: refs/heads/master
Commit: 54923953a20a719b399de8c328b582a466690680
Parents: 2d3a9de
Author: Clifford Jansen <cl...@apache.org>
Authored: Sun Apr 9 16:42:43 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Sun Apr 9 16:42:43 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 153 +++++++++++++++++++++++++++++--------
 1 file changed, 120 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/54923953/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 511b95d..85ee99d 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -63,6 +63,11 @@
 // First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
 // ========================================================================
 
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire
+// psocket first to avoid deadlock).  TODO: revisit the exception and its
+// awkwardness in the code (additional mutex? different type?).
+
 typedef pthread_mutex_t pmutex;
 static void pmutex_init(pthread_mutex_t *pm){
   pthread_mutexattr_t attr;
@@ -87,7 +92,7 @@ typedef enum {
   LISTENER_IO,
   PROACTOR_TIMER } epoll_type_t;
 
-// Context to use with epoll.
+// Data to use with epoll.
 typedef struct epoll_extended_t {
   psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
   int fd;
@@ -275,8 +280,12 @@ static void pcontext_finalize(pcontext_t* ctx) {
 /* common to connection and listener */
 typedef struct psocket_t {
   pn_proactor_t *proactor;
+  // Next 4 are protected by the proactor mutex
   struct psocket_t* next;   /* Protected by proactor.mutex */
   struct psocket_t* prev;   /* Protected by proactor.mutex */
+  bool disconnecting;       /* pn_proactor_disconnect */
+  int disconnect_ops;       /* ops remaining before disconnect complete */
+  // Remaining protected by the pconnection/listener mutex
   int sockfd;
   epoll_extended_t epoll_io;
   bool is_conn;
@@ -294,8 +303,9 @@ struct pn_proactor_t {
   epoll_extended_t epoll_wake;
   pn_event_t *cached_event;
   pn_event_batch_t batch;
-  size_t interrupts;             /* total pending interrupts */
+  size_t interrupts;            /* total pending interrupts */
   size_t deferred_interrupts;   /* interrupts for current batch */
+  size_t disconnects_pending;   /* unfinished proactor disconnects*/
   bool inactive;
   bool timer_expired;
   bool timer_cancelled;
@@ -395,6 +405,8 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
   ps->proactor = p;
   ps->next = NULL;
   ps->prev = NULL;
+  ps->disconnecting = false;
+  ps->disconnect_ops = 0;
   ps->is_conn = is_conn;
   ps->closing = false;
   ps->sockfd = -1;
@@ -456,7 +468,7 @@ struct pn_listener_t {
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(psocket_t *ps);
-static void proactor_remove(psocket_t *ps);
+static bool proactor_remove(psocket_t *ps);
 
 static inline pconnection_t *as_pconnection(psocket_t* ps) {
   return ps->is_conn ? (pconnection_t*)ps : NULL;
@@ -586,17 +598,25 @@ static inline bool pconnection_is_final(pconnection_t *pc) {
   return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops;
 }
 
+static void pconnection_final_free(pconnection_t *pc) {
+  pn_incref(pc);                /* Make sure we don't do a circular free */
+  pn_connection_driver_destroy(&pc->driver);
+  pn_decref(pc);
+  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
+
 // call without lock, but only if pconnection_is_final() is true
 static void pconnection_cleanup(pconnection_t *pc) {
   if (pc->psocket.sockfd != -1)
     close(pc->psocket.sockfd);
-  proactor_remove(&pc->psocket);
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
   ptimer_finalize(&pc->timer);
-  pn_incref(pc);                /* Make sure we don't do a circular free */
-  pn_connection_driver_destroy(&pc->driver);
-  pn_decref(pc);
-  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+  lock(&pc->context.mutex);
+  bool can_free = proactor_remove(&pc->psocket);
+  unlock(&pc->context.mutex);
+  if (can_free)
+    pconnection_final_free(pc);
+  // else proactor_disconnect logic owns psocket and its final free
 }
 
 // Call with lock held or from forced_shutdown
@@ -1078,16 +1098,24 @@ static inline bool listener_can_free(pn_listener_t *l) {
     !l->context.wake_ops;
 }
 
+static inline void listener_final_free(pn_listener_t *l) {
+  pcontext_finalize(&l->context);
+  free(l);
+}
+
 void pn_listener_free(pn_listener_t *l) {
   // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
-  // TODO: handle external call by user
-  proactor_remove(&l->psocket);
   if (l) {
     if (l->collector) pn_collector_free(l->collector);
     if (!l->condition) pn_condition_free(l->condition);
     if (!l->attachments) pn_free(l->attachments);
-    pcontext_finalize(&l->context);
-    free(l);
+    lock(&l->context.mutex);
+    bool can_free = proactor_remove(&l->psocket);
+    unlock(&l->context.mutex);
+    if (can_free) {
+      listener_final_free(l);
+      return;
+    } // else... proactor_disconnect logic has assumed ownership
   }
 }
 
@@ -1410,27 +1438,45 @@ static void proactor_add(psocket_t *ps) {
   unlock(&p->context.mutex);
 }
 
-static void proactor_remove(psocket_t *ps) {
+// call with psocket's mutex held
+// return true if safe for caller to free psocket
+static bool proactor_remove(psocket_t *ps) {
   pn_proactor_t *p = ps->proactor;
   lock(&p->context.mutex);
   bool notify = false;
-  if (ps->prev)
-    ps->prev->next = ps->next;
-  else {
-    p->psockets = ps->next;
-    ps->next = NULL;
-    if (p->psockets)
-      p->psockets->prev = NULL;
+  bool can_free = true;
+  if (ps->disconnecting) {
+    // No longer on psockets list
+    if (--ps->disconnect_ops == 0) {
+      if (--p->disconnects_pending == 0 && !p->psockets) {
+        p->inactive = true;
+        notify = wake(&p->context);
+      }
+    }
+    else                  // procator_disconnect() still processing
+      can_free = false;   // this psocket
   }
-  if (ps->next)
-    ps->next->prev = ps->prev;
+  else {
+    // normal case
+    if (ps->prev)
+      ps->prev->next = ps->next;
+    else {
+      p->psockets = ps->next;
+      ps->next = NULL;
+      if (p->psockets)
+        p->psockets->prev = NULL;
+    }
+    if (ps->next)
+      ps->next->prev = ps->prev;
 
-  if (!p->psockets && !p->shutting_down) {
-    p->inactive = true;
-    notify = wake(&p->context);
+    if (!p->psockets && !p->disconnects_pending && !p->shutting_down) {
+      p->inactive = true;
+      notify = wake(&p->context);
+    }
   }
   unlock(&p->context.mutex);
   if (notify) wake_notify(&p->context);
+  return can_free;
 }
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
@@ -1574,27 +1620,68 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
 }
 
 void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
-  // TODO: make this boilerplate actually work without deadlocking
   lock(&p->context.mutex);
-  psocket_t *ps = p->psockets;
+  // Move the whole psockets list into a disconnecting state
+  psocket_t *disconnecting_psockets = p->psockets;
+  p->psockets = NULL;
+  // First pass: mark each psocket as disconnecting and update global pending count.
+  psocket_t *ps = disconnecting_psockets;
   while (ps) {
+    ps->disconnecting = true;
+    ps->disconnect_ops = 2;   // Second pass below and proactor_remove(), in any order.
+    p->disconnects_pending++;
+    ps = ps->next;
+  }
+  unlock(&p->context.mutex);
+  if (!disconnecting_psockets)
+    return;
+
+  // Second pass: different locking, close the psockets, free them if !disconnect_ops
+  bool notify = false;
+  for (ps = disconnecting_psockets; ps; ps = ps->next) {
+    bool do_free = false;
+    pmutex *ps_mutex = NULL;
     pconnection_t *pc = as_pconnection(ps);
     if (pc) {
-      if (cond) {
-         pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
-       }
-       pn_connection_driver_close(&pc->driver);
+      ps_mutex = &pc->context.mutex;
+      lock(ps_mutex);
+      if (!ps->closing) {
+        if (cond) {
+          pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+        }
+        pn_connection_driver_close(&pc->driver);
+      }
     } else {
       pn_listener_t *l = as_listener(ps);
-      if (l) {
+      assert(l);
+      ps_mutex = &l->context.mutex;
+      lock(ps_mutex);
+      if (!ps->closing) {
         if (cond) {
           pn_condition_copy(pn_listener_condition(l), cond);
         }
         pn_listener_close(l);
       }
     }
+
+    lock(&p->context.mutex);
+    if (--ps->disconnect_ops == 0) {
+      do_free = true;
+      if (--p->disconnects_pending == 0 && !p->psockets) {
+        p->inactive = true;
+        notify = wake(&p->context);
+      }
+    }
+    unlock(&p->context.mutex);
+    unlock(ps_mutex);
+
+    if (do_free) {
+      if (pc) pconnection_final_free(pc);
+      else listener_final_free(as_listener(ps));
+    }
   }
-  unlock(&p->context.mutex);
+  if (notify)
+    wake_notify(&p->context);
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org