You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:25:51 UTC

[28/50] [abbrv] qpid-proton git commit: PROTON-1460: C epoll proactor, deterministic socket IO callbacks on close

PROTON-1460: C epoll proactor, deterministic socket IO callbacks on close


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d25089bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d25089bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d25089bf

Branch: refs/heads/go1
Commit: d25089bf2834dc4f650697c39d09dfb95c5509a2
Parents: ec1d1a3
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed May 24 23:48:34 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed May 24 23:59:41 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 55 +++++++++++++++++++++++---------------
 1 file changed, 34 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d25089bf/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 2f99cd9..c65fa44 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -682,6 +682,7 @@ static void pconnection_final_free(pconnection_t *pc) {
 
 // call without lock, but only if pconnection_is_final() is true
 static void pconnection_cleanup(pconnection_t *pc) {
+  stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
   if (pc->psocket.sockfd != -1)
     close(pc->psocket.sockfd);
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
@@ -698,8 +699,16 @@ static void pconnection_cleanup(pconnection_t *pc) {
 static void pconnection_begin_close(pconnection_t *pc) {
   if (!pc->context.closing) {
     pc->context.closing = true;
-    stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
-    pc->current_arm = 0;
+    if (pc->current_arm != 0 && !pc->new_events) {
+      // Force io callback via hangup
+      if (pc->current_arm != (EPOLLIN | EPOLLOUT)) {
+        pc->current_arm = (EPOLLIN | EPOLLOUT);
+        pc->psocket.epoll_io.wanted = pc->current_arm;;
+        rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
+      }
+      shutdown(pc->psocket.sockfd, SHUT_RDWR);
+    }
+
     pn_connection_driver_close(&pc->driver);
     if (ptimer_shutdown(&pc->timer, pc->timer_armed))
       pc->timer_armed = false;  // disarmed in the sense that the timer will never fire again
@@ -713,6 +722,8 @@ static void pconnection_begin_close(pconnection_t *pc) {
 
 static void pconnection_forced_shutdown(pconnection_t *pc) {
   // Called by proactor_free, no competing threads, no epoll activity.
+  pc->current_arm = 0;
+  pc->new_events = 0;
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
   pc->timer_armed = false;
@@ -886,12 +897,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
   // Confirmed as working thread.  Review state and unlock ASAP.
 
-  if (pc->context.closing && pconnection_is_final(pc)) {
-    unlock(&pc->context.mutex);
-    pconnection_cleanup(pc);
-    return NULL;
-  }
-
  retry:
 
   if (pc->queued_disconnect) {  // From pn_proactor_disconnect()
@@ -919,28 +924,36 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   if (pc->new_events) {
-    if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
-      pconnection_maybe_connect_lh(pc);
-    else
-      pconnection_connected_lh(pc); /* Non error event means we are connected */
-    if (pc->new_events & EPOLLOUT)
-      pc->write_blocked = false;
-    if (pc->new_events & EPOLLIN)
-      pc->read_blocked = false;
+    if (!pc->context.closing) {
+      if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
+        pconnection_maybe_connect_lh(pc);
+      else
+        pconnection_connected_lh(pc); /* Non error event means we are connected */
+      if (pc->new_events & EPOLLOUT)
+          pc->write_blocked = false;
+      if (pc->new_events & EPOLLIN)
+          pc->read_blocked = false;
+    }
     pc->current_arm = 0;
     pc->new_events = 0;
   }
   bool unarmed = (pc->current_arm == 0);
-  if (!pc->timer_armed) {
+
+  if (pc->context.closing && pconnection_is_final(pc)) {
+    unlock(&pc->context.mutex);
+    pconnection_cleanup(pc);
+    return NULL;
+  }
+
+  if (!pc->timer_armed && !pc->timer.shutting_down) {
     pc->timer_armed = true;  // about to rearm outside the lock
     timer_unarmed = true;    // so we remember
   }
-  bool timer_shutting_down = pc->timer.shutting_down;
 
   unlock(&pc->context.mutex);
   pc->hog_count++; // working context doing work
 
-  if (timer_unarmed && !timer_shutting_down) {
+  if (timer_unarmed) {
     rearm(pc->psocket.proactor, &pc->timer.epoll_io);
     timer_unarmed = false;
   }
@@ -1279,7 +1292,6 @@ void pn_listener_free(pn_listener_t *l) {
   /* Note at this point either the listener has never been used (freed by user)
      or it has been closed, so all its sockets are closed.
   */
-  // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
   if (l) {
     bool can_free = true;
     if (l->collector) pn_collector_free(l->collector);
@@ -1296,6 +1308,7 @@ void pn_listener_free(pn_listener_t *l) {
 }
 
 static void listener_begin_close(pn_listener_t* l) {
+  // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
   if (!l->context.closing) {
     l->context.closing = true;
     /* Close all listening sockets */
@@ -1338,7 +1351,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
   pn_listener_t *l = psocket_listener(ps);
   lock(&l->context.mutex);
   if (events) {
-    l->armed = false;
+    l->armed = false;  // TODO: armed logic should be per socket not per aggregate listener
     if (events & EPOLLRDHUP) {
       /* Calls listener_begin_close which closes all the listener's sockets */
       psocket_error(ps, errno, "listener epoll");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org