You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:14:14 UTC
[85/89] [abbrv] qpid-proton git commit: PROTON-1842: defer connection
cleanup to reduce likelyhood of memory corruption on shutdown. Already rare,
now a thousand times more rare. Comprehensive fix still in progress.
PROTON-1842: defer connection cleanup to reduce likelyhood of memory corruption on shutdown.
Already rare, now a thousand times more rare. Comprehensive fix still in progress.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/79d90195
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/79d90195
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/79d90195
Branch: refs/heads/go1
Commit: 79d901959b8dca060d76e627f65256a66abb0778
Parents: 9fd19bc
Author: Clifford Jansen <cl...@apache.org>
Authored: Thu May 17 09:40:03 2018 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Thu May 17 09:40:03 2018 -0700
----------------------------------------------------------------------
c/src/proactor/epoll.c | 66 +++++++++++++++++++++++++++++++++++++++++----
1 file changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/79d90195/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d1e9820..43bf897 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -280,6 +280,11 @@ const char *AMQP_PORT_NAME = "amqp";
// and increases latency.
#define HOG_MAX 1
+// pn_proactor_t.deferred_free_list, pconnection_t.free_in_progress, pconnection_deferred_free()
+// are part of a temporary mitigation strategy for PROTON-1842 pending a comprehensive solution.
+// Cost is just under 2K bytes per lingering "deferred free" connection, limited per proactor by:
+#define MAX_DEFERRED_FREES 10
+
/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
Class definitions are for identification as pn_event_t context only.
*/
@@ -409,6 +414,9 @@ struct pn_proactor_t {
// If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
acceptor_t *overflow;
pmutex overflow_mutex;
+ pmutex deferred_free_mutex;
+ void *deferred_free_list[MAX_DEFERRED_FREES];
+ int def_idx;
};
static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
@@ -521,6 +529,7 @@ typedef struct pconnection_t {
bool read_blocked;
bool write_blocked;
bool disconnected;
+ bool free_in_progress;
int hog_count; // thread hogging limiter
pn_event_batch_t batch;
pn_connection_driver_t driver;
@@ -815,6 +824,33 @@ static inline bool pconnection_is_final(pconnection_t *pc) {
return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
}
+static void pconnection_deferred_free(pconnection_t *pc) {
+ // Delay all cleanup that could confuse a thread processing a stray socket event.
+ pmutex_finalize(&pc->rearm_mutex);
+ pn_condition_free(pc->disconnect_condition);
+ pn_connection_driver_destroy(&pc->driver);
+ pcontext_finalize(&pc->context);
+ free(pc);
+}
+
+static void defer_free(pconnection_t *pc) {
+ lock(&pc->context.mutex);
+ pc->free_in_progress = true;
+ memory_barrier(&pc->psocket.epoll_io);
+ unlock(&pc->context.mutex);
+
+ pn_proactor_t *p = pc->psocket.proactor;
+ lock(&p->deferred_free_mutex);
+ void **defpc = &p->deferred_free_list[p->def_idx++];
+ if (p->def_idx == MAX_DEFERRED_FREES)
+ p->def_idx = 0;
+ pconnection_t *now_freeable = (pconnection_t *) *defpc;
+ *defpc = pc; // Defer freeing this until MAX_DEFERRED_FREES other calls to this function
+ unlock(&p->deferred_free_mutex);
+ if (now_freeable)
+ pconnection_deferred_free(now_freeable);
+}
+
static void pconnection_final_free(pconnection_t *pc) {
if (pc->driver.connection) {
set_pconnection(pc->driver.connection, NULL);
@@ -822,11 +858,9 @@ static void pconnection_final_free(pconnection_t *pc) {
if (pc->addrinfo) {
freeaddrinfo(pc->addrinfo);
}
- pmutex_finalize(&pc->rearm_mutex);
- pn_condition_free(pc->disconnect_condition);
- pn_connection_driver_destroy(&pc->driver);
- pcontext_finalize(&pc->context);
- free(pc);
+ // Temporary deferral of "final" free. PROTON-1842 mitigation.
+ // Proper fix expected in near future.
+ defer_free(pc);
}
// call without lock, but only if pconnection_is_final() is true
@@ -1022,6 +1056,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
bool waking = false;
bool tick_required = false;
+ if (events && pc->free_in_progress) // We should not be here... PROTON-1842
+ return NULL;
+
// Don't touch data exclusive to working thread (yet).
if (timeout) {
@@ -1031,6 +1068,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
lock(&pc->context.mutex);
if (events) {
+ if (pc->context.closing) {
+ memory_barrier(&pc->psocket.epoll_io);
+ if (pc->free_in_progress) {
+ unlock(&pc->context.mutex);
+ return NULL;
+ }
+ }
pc->new_events = events;
events = 0;
}
@@ -1089,6 +1133,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
if (pc->new_events) {
+ if (pc->context.closing && pc->current_arm == 0) {
+ unlock(&pc->context.mutex);
+ return NULL;
+ }
pc->current_arm = 0;
if (!pc->context.closing) {
if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
@@ -1772,6 +1820,7 @@ pn_proactor_t *pn_proactor() {
pcontext_init(&p->context, PROACTOR, p, p);
pmutex_init(&p->eventfd_mutex);
ptimer_init(&p->timer, 0);
+ pmutex_init(&p->deferred_free_mutex);
if ((p->epollfd = epoll_create(1)) >= 0) {
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
@@ -1792,6 +1841,7 @@ pn_proactor_t *pn_proactor() {
if (p->eventfd >= 0) close(p->eventfd);
if (p->interruptfd >= 0) close(p->interruptfd);
ptimer_finalize(&p->timer);
+ pmutex_finalize(&p->deferred_free_mutex);
if (p->collector) pn_free(p->collector);
free (p);
return NULL;
@@ -1825,6 +1875,12 @@ void pn_proactor_free(pn_proactor_t *p) {
pn_collector_free(p->collector);
pmutex_finalize(&p->eventfd_mutex);
pcontext_finalize(&p->context);
+ pmutex_finalize(&p->deferred_free_mutex);
+ for (int i = 0; i < MAX_DEFERRED_FREES; i++) {
+ pconnection_t *pc = (pconnection_t *) p->deferred_free_list[i];
+ if (pc)
+ pconnection_deferred_free(pc);
+ }
free(p);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org