You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:06 UTC

[38/50] qpid-proton git commit: PROTON-1564: fix ref counting and pconnection binding thread safety for pconnection setup and teardown

PROTON-1564: fix ref counting and pconnection binding thread safety for pconnection setup and teardown


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b9525a68
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b9525a68
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b9525a68

Branch: refs/heads/go1
Commit: b9525a68b3929f004fea362dda35271aa5a5b464
Parents: 540622e
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue Aug 29 22:25:41 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue Aug 29 22:27:49 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 59 +++++++++++++++++++++++++++++---------
 1 file changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b9525a68/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index fdb660c..3faeb1a 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -386,6 +386,7 @@ struct pn_proactor_t {
   // wake subsystem
   int eventfd;
   pmutex eventfd_mutex;
+  pmutex bind_mutex;
   bool wakes_in_progress;
   pcontext_t *wake_list_first;
   pcontext_t *wake_list_last;
@@ -510,6 +511,7 @@ typedef struct pconnection_t {
   bool read_blocked;
   bool write_blocked;
   bool disconnected;
+  bool bound;
   int hog_count; // thread hogging limiter
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
@@ -692,13 +694,23 @@ static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
 static void pconnection_tick(pconnection_t *pc);
 
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
 {
-  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
-  if (!pc) return NULL;
+  lock(&p->bind_mutex);
+  pn_record_t *r = pn_connection_attachments(c);
+  if (pn_record_get(r, PN_PROACTOR)) {
+    unlock(&p->bind_mutex);
+    free(pc);
+    return "pn_connection_t already in use";
+  }
+  pn_record_def(r, PN_PROACTOR, &pconnection_class);
+  pn_record_set(r, PN_PROACTOR, pc);
+  pc->bound = true;
+  unlock(&p->bind_mutex);
+
   if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     free(pc);
-    return NULL;
+    return "pn_connection_driver_init failure";
   }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
@@ -720,9 +732,6 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   if (server) {
     pn_transport_set_server(pc->driver.transport);
   }
-  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  pn_record_def(r, PN_PROACTOR, &pconnection_class);
-  pn_record_set(r, PN_PROACTOR, pc);
 
   if (!ptimer_init(&pc->timer, &pc->psocket)) {
     psocket_error(&pc->psocket, errno, "timer setup");
@@ -731,7 +740,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   pmutex_init(&pc->rearm_mutex);
 
   pn_decref(pc);                /* Will be deleted when the connection is */
-  return pc;
+  return NULL;
 }
 
 // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
@@ -746,10 +755,11 @@ static void pconnection_final_free(pconnection_t *pc) {
   }
   pmutex_finalize(&pc->rearm_mutex);
   pn_condition_free(pc->disconnect_condition);
-  pn_incref(pc);                /* Make sure we don't do a circular free */
+  if (pc->bound)
+      pn_incref(pc);                /* Make sure we don't do a circular free */
   pn_connection_driver_destroy(&pc->driver);
   pn_decref(pc);
-  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+  /* Freed if not bound, otherwise pc is freed iff the pn_connection_t is freed. */
 }
 
 // call without lock, but only if pconnection_is_final() is true
@@ -1214,9 +1224,15 @@ static bool wake_if_inactive(pn_proactor_t *p) {
 }
 
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
-  pconnection_t *pc = new_pconnection_t(p, c, false, addr);
+  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, p, c, false, addr);
+  if (err) {
+    pn_logf("pn_proactor_connect failure: %s", err);
+    return;
+  }
   // TODO: check case of proactor shutting down
+
   lock(&pc->context.mutex);
   proactor_add(&pc->context);
   pn_connection_open(pc->driver.connection); /* Auto-open */
@@ -1276,7 +1292,15 @@ void pn_proactor_release_connection(pn_connection_t *c) {
   pconnection_t *pc = get_pconnection(c);
   if (pc) {
     lock(&pc->context.mutex);
+    // reverse lifecycle entanglement of pc and c from new_pconnection_t()
+    pn_incref(pc);
+    pn_proactor_t *p = pc->psocket.proactor;
+    lock(&p->bind_mutex);
+    pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+    pn_record_set(r, PN_PROACTOR, NULL);
     pn_connection_driver_release_connection(&pc->driver);
+    pc->bound = false;  // Transport unbound
+    unlock(&p->bind_mutex);
     pconnection_begin_close(pc);
     notify = wake(&pc->context);
     unlock(&pc->context.mutex);
@@ -1548,9 +1572,14 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+  assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, l->psockets[0].proactor, c, true, "");
+  if (err) {
+    pn_logf("pn_listener_accept failure: %s", err);
+    return;
+  }
   // TODO: fuller sanity check on input args
-  pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
-  assert(pc);  // TODO: memory safety
 
   lock(&l->context.mutex);
   int fd = l->accepted_fd;
@@ -1587,6 +1616,7 @@ pn_proactor_t *pn_proactor() {
   p->epollfd = p->eventfd = p->timer.timerfd = -1;
   pcontext_init(&p->context, PROACTOR, p, p);
   pmutex_init(&p->eventfd_mutex);
+  pmutex_init(&p->bind_mutex);
   ptimer_init(&p->timer, 0);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
@@ -1640,6 +1670,7 @@ void pn_proactor_free(pn_proactor_t *p) {
 
   pn_collector_free(p->collector);
   pmutex_finalize(&p->eventfd_mutex);
+  pmutex_finalize(&p->bind_mutex);
   pcontext_finalize(&p->context);
   free(p);
 }
@@ -1649,7 +1680,7 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   pn_listener_t *l = pn_event_listener(e);
   if (l) return l->psockets[0].proactor;
   pn_connection_t *c = pn_event_connection(e);
-  if (c) return pn_connection_proactor(pn_event_connection(e));
+  if (c) return pn_connection_proactor(c);
   return NULL;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org