You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:06 UTC
[38/50] qpid-proton git commit: PROTON-1564: fix ref counting and
pconnection binding thread safety for pconnection setup and teardown
PROTON-1564: fix ref counting and pconnection binding thread safety for pconnection setup and teardown
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b9525a68
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b9525a68
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b9525a68
Branch: refs/heads/go1
Commit: b9525a68b3929f004fea362dda35271aa5a5b464
Parents: 540622e
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue Aug 29 22:25:41 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue Aug 29 22:27:49 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 59 +++++++++++++++++++++++++++++---------
1 file changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b9525a68/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index fdb660c..3faeb1a 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -386,6 +386,7 @@ struct pn_proactor_t {
// wake subsystem
int eventfd;
pmutex eventfd_mutex;
+ pmutex bind_mutex;
bool wakes_in_progress;
pcontext_t *wake_list_first;
pcontext_t *wake_list_last;
@@ -510,6 +511,7 @@ typedef struct pconnection_t {
bool read_blocked;
bool write_blocked;
bool disconnected;
+ bool bound;
int hog_count; // thread hogging limiter
pn_event_batch_t batch;
pn_connection_driver_t driver;
@@ -692,13 +694,23 @@ static const pn_class_t pconnection_class = PN_CLASS(pconnection);
static void pconnection_tick(pconnection_t *pc);
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
{
- pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
- if (!pc) return NULL;
+ lock(&p->bind_mutex);
+ pn_record_t *r = pn_connection_attachments(c);
+ if (pn_record_get(r, PN_PROACTOR)) {
+ unlock(&p->bind_mutex);
+ free(pc);
+ return "pn_connection_t already in use";
+ }
+ pn_record_def(r, PN_PROACTOR, &pconnection_class);
+ pn_record_set(r, PN_PROACTOR, pc);
+ pc->bound = true;
+ unlock(&p->bind_mutex);
+
if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
free(pc);
- return NULL;
+ return "pn_connection_driver_init failure";
}
pcontext_init(&pc->context, PCONNECTION, p, pc);
psocket_init(&pc->psocket, p, NULL, addr);
@@ -720,9 +732,6 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
if (server) {
pn_transport_set_server(pc->driver.transport);
}
- pn_record_t *r = pn_connection_attachments(pc->driver.connection);
- pn_record_def(r, PN_PROACTOR, &pconnection_class);
- pn_record_set(r, PN_PROACTOR, pc);
if (!ptimer_init(&pc->timer, &pc->psocket)) {
psocket_error(&pc->psocket, errno, "timer setup");
@@ -731,7 +740,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
pmutex_init(&pc->rearm_mutex);
pn_decref(pc); /* Will be deleted when the connection is */
- return pc;
+ return NULL;
}
// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
@@ -746,10 +755,11 @@ static void pconnection_final_free(pconnection_t *pc) {
}
pmutex_finalize(&pc->rearm_mutex);
pn_condition_free(pc->disconnect_condition);
- pn_incref(pc); /* Make sure we don't do a circular free */
+ if (pc->bound)
+ pn_incref(pc); /* Make sure we don't do a circular free */
pn_connection_driver_destroy(&pc->driver);
pn_decref(pc);
- /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+ /* Freed if not bound, otherwise pc is freed iff the pn_connection_t is freed. */
}
// call without lock, but only if pconnection_is_final() is true
@@ -1214,9 +1224,15 @@ static bool wake_if_inactive(pn_proactor_t *p) {
}
void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
- pconnection_t *pc = new_pconnection_t(p, c, false, addr);
+ pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
assert(pc); // TODO: memory safety
+ const char *err = pconnection_setup(pc, p, c, false, addr);
+ if (err) {
+ pn_logf("pn_proactor_connect failure: %s", err);
+ return;
+ }
// TODO: check case of proactor shutting down
+
lock(&pc->context.mutex);
proactor_add(&pc->context);
pn_connection_open(pc->driver.connection); /* Auto-open */
@@ -1276,7 +1292,15 @@ void pn_proactor_release_connection(pn_connection_t *c) {
pconnection_t *pc = get_pconnection(c);
if (pc) {
lock(&pc->context.mutex);
+ // reverse lifecycle entanglement of pc and c from new_pconnection_t()
+ pn_incref(pc);
+ pn_proactor_t *p = pc->psocket.proactor;
+ lock(&p->bind_mutex);
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+ pn_record_set(r, PN_PROACTOR, NULL);
pn_connection_driver_release_connection(&pc->driver);
+ pc->bound = false; // Transport unbound
+ unlock(&p->bind_mutex);
pconnection_begin_close(pc);
notify = wake(&pc->context);
unlock(&pc->context.mutex);
@@ -1548,9 +1572,14 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
}
void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+ pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+ assert(pc); // TODO: memory safety
+ const char *err = pconnection_setup(pc, l->psockets[0].proactor, c, true, "");
+ if (err) {
+ pn_logf("pn_listener_accept failure: %s", err);
+ return;
+ }
// TODO: fuller sanity check on input args
- pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
- assert(pc); // TODO: memory safety
lock(&l->context.mutex);
int fd = l->accepted_fd;
@@ -1587,6 +1616,7 @@ pn_proactor_t *pn_proactor() {
p->epollfd = p->eventfd = p->timer.timerfd = -1;
pcontext_init(&p->context, PROACTOR, p, p);
pmutex_init(&p->eventfd_mutex);
+ pmutex_init(&p->bind_mutex);
ptimer_init(&p->timer, 0);
if ((p->epollfd = epoll_create(1)) >= 0) {
@@ -1640,6 +1670,7 @@ void pn_proactor_free(pn_proactor_t *p) {
pn_collector_free(p->collector);
pmutex_finalize(&p->eventfd_mutex);
+ pmutex_finalize(&p->bind_mutex);
pcontext_finalize(&p->context);
free(p);
}
@@ -1649,7 +1680,7 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
pn_listener_t *l = pn_event_listener(e);
if (l) return l->psockets[0].proactor;
pn_connection_t *c = pn_event_connection(e);
- if (c) return pn_connection_proactor(pn_event_connection(e));
+ if (c) return pn_connection_proactor(c);
return NULL;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org