You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:14:14 UTC

[85/89] [abbrv] qpid-proton git commit: PROTON-1842: defer connection cleanup to reduce likelyhood of memory corruption on shutdown. Already rare, now a thousand times more rare. Comprehensive fix still in progress.

PROTON-1842: defer connection cleanup to reduce likelyhood of memory corruption on shutdown.
Already rare, now a thousand times more rare.  Comprehensive fix still in progress.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/79d90195
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/79d90195
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/79d90195

Branch: refs/heads/go1
Commit: 79d901959b8dca060d76e627f65256a66abb0778
Parents: 9fd19bc
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu May 17 09:40:03 2018 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu May 17 09:40:03 2018 -0700

----------------------------------------------------------------------
 c/src/proactor/epoll.c | 66 +++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/79d90195/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d1e9820..43bf897 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -280,6 +280,11 @@ const char *AMQP_PORT_NAME = "amqp";
 // and increases latency.
 #define HOG_MAX 1
 
+// pn_proactor_t.deferred_free_list, pconnection_t.free_in_progress, pconnection_deferred_free()
+// are part of a temporary mitigation strategy for PROTON-1842 pending a comprehensive solution.
+// Cost is just under 2K bytes per lingering "deferred free" connection, limited per proactor by:
+#define MAX_DEFERRED_FREES 10
+
 /* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
    Class definitions are for identification as pn_event_t context only.
 */
@@ -409,6 +414,9 @@ struct pn_proactor_t {
   // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
   acceptor_t *overflow;
   pmutex overflow_mutex;
+  pmutex deferred_free_mutex;
+  void *deferred_free_list[MAX_DEFERRED_FREES];
+  int def_idx;
 };
 
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
@@ -521,6 +529,7 @@ typedef struct pconnection_t {
   bool read_blocked;
   bool write_blocked;
   bool disconnected;
+  bool free_in_progress;
   int hog_count; // thread hogging limiter
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
@@ -815,6 +824,33 @@ static inline bool pconnection_is_final(pconnection_t *pc) {
   return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
 }
 
+static void pconnection_deferred_free(pconnection_t *pc) {
+  // Delay all cleanup that could confuse a thread processing a stray socket event.
+  pmutex_finalize(&pc->rearm_mutex);
+  pn_condition_free(pc->disconnect_condition);
+  pn_connection_driver_destroy(&pc->driver);
+  pcontext_finalize(&pc->context);
+  free(pc);
+}
+
+static void defer_free(pconnection_t *pc) {
+  lock(&pc->context.mutex);
+  pc->free_in_progress = true;
+  memory_barrier(&pc->psocket.epoll_io);
+  unlock(&pc->context.mutex);
+
+  pn_proactor_t *p = pc->psocket.proactor;
+  lock(&p->deferred_free_mutex);
+  void **defpc = &p->deferred_free_list[p->def_idx++];
+  if (p->def_idx == MAX_DEFERRED_FREES)
+    p->def_idx = 0;
+  pconnection_t *now_freeable = (pconnection_t *) *defpc;
+  *defpc = pc;  // Defer freeing this until MAX_DEFERRED_FREES other calls to this function
+  unlock(&p->deferred_free_mutex);
+  if (now_freeable)
+    pconnection_deferred_free(now_freeable);
+}
+
 static void pconnection_final_free(pconnection_t *pc) {
   if (pc->driver.connection) {
     set_pconnection(pc->driver.connection, NULL);
@@ -822,11 +858,9 @@ static void pconnection_final_free(pconnection_t *pc) {
   if (pc->addrinfo) {
     freeaddrinfo(pc->addrinfo);
   }
-  pmutex_finalize(&pc->rearm_mutex);
-  pn_condition_free(pc->disconnect_condition);
-  pn_connection_driver_destroy(&pc->driver);
-  pcontext_finalize(&pc->context);
-  free(pc);
+  // Temporary deferral of "final" free.  PROTON-1842 mitigation.
+  // Proper fix expected in near future.
+  defer_free(pc);
 }
 
 // call without lock, but only if pconnection_is_final() is true
@@ -1022,6 +1056,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   bool waking = false;
   bool tick_required = false;
 
+  if (events && pc->free_in_progress)  // We should not be here... PROTON-1842
+    return NULL;
+
   // Don't touch data exclusive to working thread (yet).
 
   if (timeout) {
@@ -1031,6 +1068,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   lock(&pc->context.mutex);
 
   if (events) {
+    if (pc->context.closing) {
+      memory_barrier(&pc->psocket.epoll_io);
+      if (pc->free_in_progress) {
+        unlock(&pc->context.mutex);
+        return NULL;
+      }
+    }
     pc->new_events = events;
     events = 0;
   }
@@ -1089,6 +1133,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   if (pc->new_events) {
+    if (pc->context.closing && pc->current_arm == 0) {
+      unlock(&pc->context.mutex);
+      return NULL;
+    }
     pc->current_arm = 0;
     if (!pc->context.closing) {
       if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
@@ -1772,6 +1820,7 @@ pn_proactor_t *pn_proactor() {
   pcontext_init(&p->context, PROACTOR, p, p);
   pmutex_init(&p->eventfd_mutex);
   ptimer_init(&p->timer, 0);
+  pmutex_init(&p->deferred_free_mutex);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
@@ -1792,6 +1841,7 @@ pn_proactor_t *pn_proactor() {
   if (p->eventfd >= 0) close(p->eventfd);
   if (p->interruptfd >= 0) close(p->interruptfd);
   ptimer_finalize(&p->timer);
+  pmutex_finalize(&p->deferred_free_mutex);
   if (p->collector) pn_free(p->collector);
   free (p);
   return NULL;
@@ -1825,6 +1875,12 @@ void pn_proactor_free(pn_proactor_t *p) {
   pn_collector_free(p->collector);
   pmutex_finalize(&p->eventfd_mutex);
   pcontext_finalize(&p->context);
+  pmutex_finalize(&p->deferred_free_mutex);
+  for (int i = 0; i < MAX_DEFERRED_FREES; i++) {
+    pconnection_t *pc = (pconnection_t *) p->deferred_free_list[i];
+    if (pc)
+      pconnection_deferred_free(pc);
+  }
   free(p);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org