You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/19 17:08:11 UTC

[2/2] qpid-proton git commit: PROTON-1440: libuv proactor - thread safe pn_connection_wake

PROTON-1440: libuv proactor - thread safe pn_connection_wake

This fix does not change the API but makes pn_connection_wake thread safe.

To be thread safe we need to a lock, so the pconnection_t attachment stays
on the pn_connection_t until the pn_connection_t is destroyed.

pn_proactor_free also was modified to run the normal socket close sequence
rather than a short-cut that just closes TCP sockets - this allows the
wake locking logic to run as normal, even if the application calls wake
after the proactor is freed.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eb12513c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eb12513c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eb12513c

Branch: refs/heads/master
Commit: eb12513c51ae244a180ffee0819e6854774c4967
Parents: 1c19abc
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 16 16:25:31 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Sun Mar 19 13:05:45 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 109 +++++++++++++++++++++++++++----------
 proton-c/src/tests/proactor.c |   9 ++-
 2 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eb12513c/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 102fcdd..1d16972 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -120,7 +120,9 @@ static inline const char* fixstr(const char* str) {
   return str[0] == '\001' ? NULL : str;
 }
 
-/* a connection socket  */
+typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state;
+
+/* An incoming or outgoing connection. */
 typedef struct pconnection_t {
   psocket_t psocket;
 
@@ -128,15 +130,17 @@ typedef struct pconnection_t {
   pn_connection_driver_t driver;
 
   /* Only used by leader */
-  uv_connect_t connect;
   uv_timer_t timer;
   uv_write_t write;
   uv_shutdown_t shutdown;
   size_t writing;               /* size of pending write request, 0 if none pending */
 
+  /* Outgoing connection only */
+  uv_connect_t connect;
+
   /* Locked for thread-safe access */
   uv_mutex_t lock;
-  bool wake;                    /* pn_connection_wake() was called */
+  wake_state wake;
 } pconnection_t;
 
 
@@ -243,8 +247,28 @@ static inline pn_listener_t *as_listener(psocket_t* ps) {
   return ps->is_conn ? NULL: (pn_listener_t*)ps;
 }
 
+/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
+#define CID_pconnection CID_pn_object
+#define pconnection_inspect NULL
+#define pconnection_initialize NULL
+#define pconnection_hashcode NULL
+#define pconnection_compare NULL
+
+static void pconnection_finalize(void *vp_pconnection) {
+  pconnection_t *pc = (pconnection_t*)vp_pconnection;
+  uv_mutex_destroy(&pc->lock);   /* Only the lock is left to clean up */
+}
+
+
+static const pn_class_t pconnection_class = PN_CLASS(pconnection);
+
 static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
-  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
+  /* pconnection_t is a pn_class instance so we can attach it to the pn_connection_t and
+     it will be finalized when the pn_connection_t is freed.
+  */
+  pconnection_t *pc =
+    (pconnection_t *) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+
   if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
@@ -254,8 +278,9 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser
     pn_transport_set_server(pc->driver.transport);
   }
   pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  pn_record_def(r, PN_PROACTOR, PN_VOID);
+  pn_record_def(r, PN_PROACTOR, &pconnection_class);
   pn_record_set(r, PN_PROACTOR, pc);
+  pn_decref(pc);                /* Will be deleted when the connection is */
   return pc;
 }
 
@@ -295,16 +320,20 @@ static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_unlock(&p->lock);
 }
 
-static void pconnection_free(pconnection_t *pc) {
-  pn_connection_driver_destroy(&pc->driver);
-  free(pc);
-}
-
 static void pn_listener_free(pn_listener_t *l);
 
 /* Final close event for for a pconnection_t, closes the timer */
 static void on_close_pconnection_final(uv_handle_t *h) {
-  pconnection_free((pconnection_t*)h->data);
+  /* If the life of the pn_connection_t has been extended with reference counts
+     we want the pconnection_t to have the same lifespan so calls to pn_connection_wake
+     will be valid (but no-ops)
+  */
+  pconnection_t *pc = (pconnection_t*)h->data;
+  /* Break circular references */
+  pn_incref(pc);                /* Don't let driver_destroy free pc */
+  pn_connection_driver_destroy(&pc->driver);
+  pn_decref(pc);
+  /* Now pc is freed iff the connection is, otherwise remains till the is freed. */
 }
 
 static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
@@ -361,9 +390,9 @@ static void listener_error(pn_listener_t *l, int err, const char* what) {
 
 static void psocket_error(psocket_t *ps, int err, const char* what) {
   if (ps->is_conn) {
-    pconnection_error(as_pconnection(ps), err, "initialization");
+    pconnection_error(as_pconnection(ps), err, what);
   } else {
-    listener_error(as_listener(ps), err, "initialization");
+    listener_error(as_listener(ps), err, what);
   }
 }
 
@@ -564,9 +593,9 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
 }
 
 static void on_stopping(uv_handle_t* h, void* v) {
-  /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
+  /* Mark all sockets with an error, pn_proactor_free will clear the resulting events */
   if (h->type == UV_TCP) {
-    uv_safe_close(h, on_close_psocket);
+    psocket_error((psocket_t*)h->data, UV_ESHUTDOWN, "proactor free");
   }
 }
 
@@ -628,13 +657,15 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
   return NULL;
 }
 
-/* Check and reset the wake flag */
-static bool check_wake(pconnection_t *pc) {
+/* Check wake state and generate WAKE event if needed */
+static void check_wake(pconnection_t *pc) {
   uv_mutex_lock(&pc->lock);
-  bool wake = pc->wake;
-  pc->wake = false;
+  if (pc->wake == W_PENDING) {
+    pn_connection_t *c = pc->driver.connection;
+    pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+    pc->wake = W_NONE;
+  }
   uv_mutex_unlock(&pc->lock);
-  return wake;
 }
 
 /* Process a pconnection, return true if it has events for a worker thread */
@@ -648,14 +679,13 @@ static bool leader_process_pconnection(pconnection_t *pc) {
     /* Start the connection if not already connected */
     leader_connect(pc);
   } else if (pn_connection_driver_finished(&pc->driver)) {
-    /* Close if the connection is finished */
+    uv_mutex_lock(&pc->lock);
+    pc->wake = W_CLOSED;        /* wake() cannot notify anymore */
+    uv_mutex_unlock(&pc->lock);
     uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
   } else {
     /* Check for events that can be generated without blocking for IO */
-    if (check_wake(pc)) {
-      pn_connection_t *c = pc->driver.connection;
-      pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-    }
+    check_wake(pc);
     pn_millis_t next_tick = leader_tick(pc);
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
     pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
@@ -857,7 +887,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int
 }
 
 pn_proactor_t *pn_proactor() {
-  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
   p->collector = pn_collector();
   p->batch.next_event = &proactor_batch_next;
   if (!p->collector) return NULL;
@@ -871,12 +901,25 @@ pn_proactor_t *pn_proactor() {
 }
 
 void pn_proactor_free(pn_proactor_t *p) {
+  if (p->count > 0) {
+    uv_walk(&p->loop, on_stopping, NULL); /* Set errors on all sockets */
+    /* Drain all events so sockets can close normally */
+    pn_event_t *e = NULL;
+    do {
+      pn_event_batch_t *eb = pn_proactor_wait(p);
+      e = pn_event_batch_next(eb);
+      while (e && pn_event_type(e) != PN_PROACTOR_INACTIVE) {
+        e = pn_event_batch_next(eb);
+      }
+      pn_proactor_done(p, eb);
+    } while (pn_event_type(e) != PN_PROACTOR_INACTIVE);
+  }
+  /* Close the the proactor handles */
   uv_timer_stop(&p->timer);
   uv_safe_close((uv_handle_t*)&p->timer, NULL);
   uv_safe_close((uv_handle_t*)&p->async, NULL);
-  uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
   while (uv_loop_alive(&p->loop)) {
-    uv_run(&p->loop, UV_RUN_ONCE);       /* Run till all handles closed */
+    uv_run(&p->loop, UV_RUN_NOWAIT); /* Run till all handles closed */
   }
   uv_loop_close(&p->loop);
   uv_mutex_destroy(&p->lock);
@@ -894,10 +937,16 @@ void pn_connection_wake(pn_connection_t* c) {
   /* May be called from any thread */
   pconnection_t *pc = get_pconnection(c);
   if (pc) {
+    bool notify = false;
     uv_mutex_lock(&pc->lock);
-    pc->wake = true;
+    if (pc->wake == W_NONE) {
+      pc->wake = W_PENDING;
+      notify = true;
+    }
     uv_mutex_unlock(&pc->lock);
-    psocket_notify(&pc->psocket);
+    if (notify) {
+      psocket_notify(&pc->psocket);
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eb12513c/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index beba46e..41d889b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -193,15 +193,22 @@ static void test_connection_wake(test_t *t) {
   test_port_t port = test_port(localhost);          /* Hold a port */
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+
   pn_connection_t *c = pn_connection();
+  pn_incref(c);                 /* Keep c alive after proactor frees it */
   pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  sock_close(port.sock);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   PROACTOR_TEST_FREE(pts);
+
+  /* The pn_connection_t is still valid so wake is legal but a no-op */
+  pn_connection_wake(c);
+  pn_decref(c);
 }
 
 /* Test that INACTIVE event is generated when last connections/listeners closes. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org