You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/19 17:08:11 UTC
[2/2] qpid-proton git commit: PROTON-1440: libuv proactor - thread
safe pn_connection_wake
PROTON-1440: libuv proactor - thread safe pn_connection_wake
This fix does not change the API but makes pn_connection_wake thread safe.
To be thread safe we need to a lock, so the pconnection_t attachment stays
on the pn_connection_t until the pn_connection_t is destroyed.
pn_proactor_free also was modified to run the normal socket close sequence
rather than a short-cut that just closes TCP sockets - this allows the
wake locking logic to run as normal, even if the application calls wake
after the proactor is freed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eb12513c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eb12513c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eb12513c
Branch: refs/heads/master
Commit: eb12513c51ae244a180ffee0819e6854774c4967
Parents: 1c19abc
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 16 16:25:31 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sun Mar 19 13:05:45 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 109 +++++++++++++++++++++++++++----------
proton-c/src/tests/proactor.c | 9 ++-
2 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eb12513c/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 102fcdd..1d16972 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -120,7 +120,9 @@ static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
-/* a connection socket */
+typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state;
+
+/* An incoming or outgoing connection. */
typedef struct pconnection_t {
psocket_t psocket;
@@ -128,15 +130,17 @@ typedef struct pconnection_t {
pn_connection_driver_t driver;
/* Only used by leader */
- uv_connect_t connect;
uv_timer_t timer;
uv_write_t write;
uv_shutdown_t shutdown;
size_t writing; /* size of pending write request, 0 if none pending */
+ /* Outgoing connection only */
+ uv_connect_t connect;
+
/* Locked for thread-safe access */
uv_mutex_t lock;
- bool wake; /* pn_connection_wake() was called */
+ wake_state wake;
} pconnection_t;
@@ -243,8 +247,28 @@ static inline pn_listener_t *as_listener(psocket_t* ps) {
return ps->is_conn ? NULL: (pn_listener_t*)ps;
}
+/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
+#define CID_pconnection CID_pn_object
+#define pconnection_inspect NULL
+#define pconnection_initialize NULL
+#define pconnection_hashcode NULL
+#define pconnection_compare NULL
+
+static void pconnection_finalize(void *vp_pconnection) {
+ pconnection_t *pc = (pconnection_t*)vp_pconnection;
+ uv_mutex_destroy(&pc->lock); /* Only the lock is left to clean up */
+}
+
+
+static const pn_class_t pconnection_class = PN_CLASS(pconnection);
+
static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
- pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
+ /* pconnection_t is a pn_class instance so we can attach it to the pn_connection_t and
+ it will be finalized when the pn_connection_t is freed.
+ */
+ pconnection_t *pc =
+ (pconnection_t *) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+
if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
return NULL;
}
@@ -254,8 +278,9 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser
pn_transport_set_server(pc->driver.transport);
}
pn_record_t *r = pn_connection_attachments(pc->driver.connection);
- pn_record_def(r, PN_PROACTOR, PN_VOID);
+ pn_record_def(r, PN_PROACTOR, &pconnection_class);
pn_record_set(r, PN_PROACTOR, pc);
+ pn_decref(pc); /* Will be deleted when the connection is */
return pc;
}
@@ -295,16 +320,20 @@ static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_unlock(&p->lock);
}
-static void pconnection_free(pconnection_t *pc) {
- pn_connection_driver_destroy(&pc->driver);
- free(pc);
-}
-
static void pn_listener_free(pn_listener_t *l);
/* Final close event for for a pconnection_t, closes the timer */
static void on_close_pconnection_final(uv_handle_t *h) {
- pconnection_free((pconnection_t*)h->data);
+ /* If the life of the pn_connection_t has been extended with reference counts
+ we want the pconnection_t to have the same lifespan so calls to pn_connection_wake
+ will be valid (but no-ops)
+ */
+ pconnection_t *pc = (pconnection_t*)h->data;
+ /* Break circular references */
+ pn_incref(pc); /* Don't let driver_destroy free pc */
+ pn_connection_driver_destroy(&pc->driver);
+ pn_decref(pc);
+ /* Now pc is freed iff the connection is, otherwise remains till the is freed. */
}
static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
@@ -361,9 +390,9 @@ static void listener_error(pn_listener_t *l, int err, const char* what) {
static void psocket_error(psocket_t *ps, int err, const char* what) {
if (ps->is_conn) {
- pconnection_error(as_pconnection(ps), err, "initialization");
+ pconnection_error(as_pconnection(ps), err, what);
} else {
- listener_error(as_listener(ps), err, "initialization");
+ listener_error(as_listener(ps), err, what);
}
}
@@ -564,9 +593,9 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
}
static void on_stopping(uv_handle_t* h, void* v) {
- /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+ /* Mark all sockets with an error, pn_proactor_free will clear the resulting events */
if (h->type == UV_TCP) {
- uv_safe_close(h, on_close_psocket);
+ psocket_error((psocket_t*)h->data, UV_ESHUTDOWN, "proactor free");
}
}
@@ -628,13 +657,15 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
return NULL;
}
-/* Check and reset the wake flag */
-static bool check_wake(pconnection_t *pc) {
+/* Check wake state and generate WAKE event if needed */
+static void check_wake(pconnection_t *pc) {
uv_mutex_lock(&pc->lock);
- bool wake = pc->wake;
- pc->wake = false;
+ if (pc->wake == W_PENDING) {
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ pc->wake = W_NONE;
+ }
uv_mutex_unlock(&pc->lock);
- return wake;
}
/* Process a pconnection, return true if it has events for a worker thread */
@@ -648,14 +679,13 @@ static bool leader_process_pconnection(pconnection_t *pc) {
/* Start the connection if not already connected */
leader_connect(pc);
} else if (pn_connection_driver_finished(&pc->driver)) {
- /* Close if the connection is finished */
+ uv_mutex_lock(&pc->lock);
+ pc->wake = W_CLOSED; /* wake() cannot notify anymore */
+ uv_mutex_unlock(&pc->lock);
uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
} else {
/* Check for events that can be generated without blocking for IO */
- if (check_wake(pc)) {
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- }
+ check_wake(pc);
pn_millis_t next_tick = leader_tick(pc);
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
@@ -857,7 +887,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int
}
pn_proactor_t *pn_proactor() {
- pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+ pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
p->collector = pn_collector();
p->batch.next_event = &proactor_batch_next;
if (!p->collector) return NULL;
@@ -871,12 +901,25 @@ pn_proactor_t *pn_proactor() {
}
void pn_proactor_free(pn_proactor_t *p) {
+ if (p->count > 0) {
+ uv_walk(&p->loop, on_stopping, NULL); /* Set errors on all sockets */
+ /* Drain all events so sockets can close normally */
+ pn_event_t *e = NULL;
+ do {
+ pn_event_batch_t *eb = pn_proactor_wait(p);
+ e = pn_event_batch_next(eb);
+ while (e && pn_event_type(e) != PN_PROACTOR_INACTIVE) {
+ e = pn_event_batch_next(eb);
+ }
+ pn_proactor_done(p, eb);
+ } while (pn_event_type(e) != PN_PROACTOR_INACTIVE);
+ }
+ /* Close the the proactor handles */
uv_timer_stop(&p->timer);
uv_safe_close((uv_handle_t*)&p->timer, NULL);
uv_safe_close((uv_handle_t*)&p->async, NULL);
- uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
while (uv_loop_alive(&p->loop)) {
- uv_run(&p->loop, UV_RUN_ONCE); /* Run till all handles closed */
+ uv_run(&p->loop, UV_RUN_NOWAIT); /* Run till all handles closed */
}
uv_loop_close(&p->loop);
uv_mutex_destroy(&p->lock);
@@ -894,10 +937,16 @@ void pn_connection_wake(pn_connection_t* c) {
/* May be called from any thread */
pconnection_t *pc = get_pconnection(c);
if (pc) {
+ bool notify = false;
uv_mutex_lock(&pc->lock);
- pc->wake = true;
+ if (pc->wake == W_NONE) {
+ pc->wake = W_PENDING;
+ notify = true;
+ }
uv_mutex_unlock(&pc->lock);
- psocket_notify(&pc->psocket);
+ if (notify) {
+ psocket_notify(&pc->psocket);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eb12513c/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index beba46e..41d889b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -193,15 +193,22 @@ static void test_connection_wake(test_t *t) {
test_port_t port = test_port(localhost); /* Hold a port */
pn_proactor_listen(server, pn_listener(), port.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+
pn_connection_t *c = pn_connection();
+ pn_incref(c); /* Keep c alive after proactor frees it */
pn_proactor_connect(client, c, port.host_port);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
pn_connection_wake(c);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
- sock_close(port.sock);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
PROACTOR_TEST_FREE(pts);
+
+ /* The pn_connection_t is still valid so wake is legal but a no-op */
+ pn_connection_wake(c);
+ pn_decref(c);
}
/* Test that INACTIVE event is generated when last connections/listeners closes. */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org