You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/04/22 20:49:49 UTC

qpid-proton git commit: PROTON-1460: epoll test fixes: log_event, listener setup, EOS detection

Repository: qpid-proton
Updated Branches:
  refs/heads/master d16794ba0 -> 80def6f9a


PROTON-1460: epoll test fixes: log_event, listener setup, EOS detection


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/80def6f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/80def6f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/80def6f9

Branch: refs/heads/master
Commit: 80def6f9a5ae4c37b69d5da2b6db650ec1b63821
Parents: d16794b
Author: Clifford Jansen <cl...@apache.org>
Authored: Sat Apr 22 13:49:22 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Sat Apr 22 13:49:22 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 53 +++++++++++++++++++++++++++++---------
 1 file changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/80def6f9/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7e2ee2c..126a7e2 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -232,11 +232,16 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
  * connection, each listener, the proactor itself.  The serialization
  * is presented to the application via each associated event batch.
  *
+ * Multiple threads can be trying to do work on a single context
+ * (i.e. socket IO is ready and wakeup at same time). Mutexes are used
+ * to manage contention.  Some vars are only ever touched by one
+ * "working" thread and are accessed without holding the mutex.
+ *
  * Currently internal wakeups (via wake()/wake_notify()) are used to
  * force a context to check if it has work to do.  To minimize trips
- * through the kernel, wake() is a no-op if the context is working.
- * Conversely, a context must never stop working without checking if
- * it has newly arrived work.
+ * through the kernel, wake() is a no-op if the context has a working
+ * thread.  Conversely, a thread must never stop working without
+ * checking if it has newly arrived work.
  *
  * External wake operations, like pn_connection_wake() and
  * pn_proactor_interrupt(), are built on top of the internal wake
@@ -506,6 +511,13 @@ static inline bool proactor_has_event(pn_proactor_t *p) {
   return (p->cached_event || (p->cached_event = pn_collector_next(p->collector)));
 }
 
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+  if (e) {
+    pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+  }
+  return e;
+}
+
 static void psocket_error(psocket_t *ps, int err, const char* what) {
   if (ps->is_conn) {
     pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
@@ -622,6 +634,7 @@ void pconnection_begin_close(pconnection_t *pc) {
     pc->psocket.closing = true;
     pc->read_closed = pc->write_closed = true;
     stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
+    pc->current_arm = 0;
     pn_connection_driver_close(&pc->driver);
     ptimer_set(&pc->timer, 0);
   }
@@ -856,7 +869,11 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
   if (!pc->read_closed) {
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    if (rbuf.size > 0 && (!pc->read_blocked)) {
+    if (rbuf.size == 0) {
+      if (pn_connection_driver_read_closed(&pc->driver))
+        pc->read_closed = true;
+    }
+    else if (!pc->read_blocked) {
       ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
 
       if (n > 0) {
@@ -932,6 +949,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     goto retry;  // TODO: get rid of goto without adding more locking
 
   pc->context.working = false;
+  pc->hog_count = 0;
   bool rearm = pconnection_rearm_check(pc);
 
   unlock(&pc->context.mutex);
@@ -971,6 +989,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   // TODO: check case of proactor shutting down
   lock(&pc->context.mutex);
   proactor_add(&pc->psocket);
+  pn_connection_open(pc->driver.connection); /* Auto-open */
 
   struct addrinfo *ai = NULL;
   int fd = -1;
@@ -1026,6 +1045,12 @@ void pn_connection_wake(pn_connection_t* c) {
   if (notify) wake_notify(&pc->context);
 }
 
+void pn_proactor_release_connection(pn_connection_t *c) {
+  pconnection_t *pc = get_pconnection(c);
+  if (pc) {
+    pn_connection_driver_release_connection(&pc->driver);
+  }
+}
 
 // ========================================================================
 // listener
@@ -1046,6 +1071,8 @@ pn_listener_t *pn_listener() {
       pn_listener_free(l);
       return NULL;
     }
+    pn_proactor_t *unknown = NULL;  // won't know until pn_proactor_listen
+    pcontext_init(&l->context, LISTENER, unknown, l);
   }
   return l;
 }
@@ -1056,12 +1083,15 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   assert(buf);  // TODO:  memory safety
   char *scheme, *user, *pass, *host, *port, *path;
   pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
-  pcontext_init(&l->context, LISTENER, p, l);
+  // TODO: check listener not already listening for this or another proactor
+  lock(&l->context.mutex);
+  l->context.proactor = p;;
   psocket_init(&l->psocket, p, false, host, port);
   l->backlog = backlog;
   proactor_add(&l->psocket);
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+  bool notify = wake(&l->context);
 
   struct addrinfo *ai = NULL;
   int fd = -1;
@@ -1076,6 +1106,8 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
             l->psocket.epoll_io.fd = fd;
             l->psocket.epoll_io.wanted = EPOLLIN;
             start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd);  // TODO: check for error
+            unlock(&l->context.mutex);
+            if (notify) wake_notify(&l->context);
             free(buf);
             return;
           }
@@ -1083,8 +1115,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   }
 
   psocket_error(&l->psocket, errno, "listen on");
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
   if (ai) freeaddrinfo(ai);
-  wake(&l->context);
   free(buf);
   return;
 }
@@ -1190,7 +1223,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
     l->close_dispatched = true;
   unlock(&l->context.mutex);
-  return e;
+  return log_event(l, e);
 }
 
 static void listener_done(pn_listener_t *l) {
@@ -1396,7 +1429,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
     e = p->cached_event;
   unlock(&p->context.mutex);
   p->cached_event = NULL;
-  return e;
+  return log_event(p, e);
 }
 
 static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
@@ -1711,7 +1744,3 @@ size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, si
   }
 }
 
-/* FIXME aconway 2017-04-21: dummy to make test link, needs implementation */
-void pn_proactor_release_connection(pn_connection_t *connection) {
-  abort();
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org