You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/19 17:48:35 UTC

[1/5] qpid-proton git commit: NO-JIRA: Use CHECK_COND_DESC instead of CHECK_IN for conditions.

Repository: qpid-proton
Updated Branches:
  refs/heads/master 8410e2e6e -> 3ae6efdf6


NO-JIRA: Use CHECK_COND_DESC instead of CHECK_IN for conditions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cb0737ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cb0737ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cb0737ae

Branch: refs/heads/master
Commit: cb0737aeb47969f9ceefd49d6d67a78cec5e354b
Parents: fcee877
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 16:49:29 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:14:08 2017 -0400

----------------------------------------------------------------------
 proton-c/src/tests/proactor.c | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cb0737ae/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index c41110b..3e1598c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -497,35 +497,35 @@ static void test_errors(test_t *t) {
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
+  TEST_COND_DESC(t, "xxx", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
+  TEST_COND_DESC(t, "xxx", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Invalid connect/listen host name */
   c = pn_connection();
   pn_proactor_connect(client, c, "nosuch.example.com:");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "nosuch", pn_condition_get_description(last_condition));
+  TEST_COND_DESC(t, "nosuch", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   l = pn_listener();
   pn_proactor_listen(server, l, "nosuch.example.com:", 1);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "nosuch", pn_condition_get_description(last_condition));
+  TEST_COND_DESC(t, "nosuch", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Connect with no listener */
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
   if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
-    TEST_STR_IN(t, "refused", pn_condition_get_description(last_condition));
+    TEST_COND_DESC(t, "refused", last_condition);
     TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
     sock_close(port.sock);
     PROACTOR_TEST_FREE(pts);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/5] qpid-proton git commit: PROTON-1494: libuv proactor PN_PROACTOR_INACTIVE behavior

Posted by ac...@apache.org.
PROTON-1494: libuv proactor PN_PROACTOR_INACTIVE behavior

Include timeout events in INACTIVE count.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/24092c91
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/24092c91
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/24092c91

Branch: refs/heads/master
Commit: 24092c9105fae64e7c91efdf6eaa2cb487ccf19e
Parents: 37421c4
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 14:59:32 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:15:25 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 51 ++++++++++++++++++++++----------------
 proton-c/src/tests/proactor.c | 33 +++++++++++++++++++++---
 2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 7460194..c0a4d8d 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,17 +240,18 @@ struct pn_proactor_t {
 
   /* Protected by lock */
   uv_mutex_t lock;
-  work_queue_t worker_q;      /* ready for work, to be returned via pn_proactor_wait()  */
-  work_queue_t leader_q;      /* waiting for attention by the leader thread */
+  work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait()  */
+  work_queue_t leader_q; /* waiting for attention by the leader thread */
   timeout_state_t timeout_state;
   pn_millis_t timeout;
-  size_t count;               /* connection/listener count for INACTIVE events */
+  size_t active;         /* connection/listener count for INACTIVE events */
   pn_condition_t *disconnect_cond; /* disconnect condition */
-  bool disconnect;            /* disconnect requested */
-  bool inactive;
-  bool has_leader;
-  bool batch_working;         /* batch is being processed in a worker thread */
-  bool need_interrupt;        /* Need a PN_PROACTOR_INTERRUPT event */
+
+  bool has_leader;             /* A thread is working as leader */
+  bool disconnect;             /* disconnect requested */
+  bool batch_working;          /* batch is being processed in a worker thread */
+  bool need_interrupt;         /* Need a PN_PROACTOR_INTERRUPT event */
+  bool need_inactive;          /* need INACTIVE event */
 };
 
 
@@ -360,19 +361,22 @@ static inline work_t *batch_work(pn_event_batch_t *batch) {
 }
 
 /* Total count of listener and connections for PN_PROACTOR_INACTIVE */
-static void leader_inc(pn_proactor_t *p) {
+static void add_active(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
-  ++p->count;
+  ++p->active;
   uv_mutex_unlock(&p->lock);
 }
 
-static void leader_dec(pn_proactor_t *p) {
-  uv_mutex_lock(&p->lock);
-  assert(p->count > 0);
-  if (--p->count == 0) {
-    p->inactive = true;
-    notify(p);
+static void remove_active_lh(pn_proactor_t *p) {
+  assert(p->active > 0);
+  if (--p->active == 0) {
+    p->need_inactive = true;
   }
+}
+
+static void remove_active(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  remove_active_lh(p);
   uv_mutex_unlock(&p->lock);
 }
 
@@ -394,7 +398,7 @@ static void on_close_pconnection_final(uv_handle_t *h) {
      will be valid, but no-ops.
   */
   pconnection_t *pc = (pconnection_t*)h->data;
-  leader_dec(pc->work.proactor);
+  remove_active(pc->work.proactor);
   pconnection_free(pc);
 }
 
@@ -495,7 +499,7 @@ static int pconnection_init(pconnection_t *pc) {
     }
   }
   if (!err) {
-    leader_inc(pc->work.proactor);
+    add_active(pc->work.proactor);
   } else {
     pconnection_error(pc, err, "initialization");
   }
@@ -638,7 +642,7 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
 
 /* Listen on all available addresses */
 static void leader_listen_lh(pn_listener_t *l) {
-  leader_inc(l->work.proactor);
+  add_active(l->work.proactor);
   int err = leader_resolve(l->work.proactor, &l->addr, true);
   if (!err) {
     /* Find the working addresses */
@@ -724,7 +728,7 @@ static bool leader_process_listener(pn_listener_t *l) {
 
    case L_CLOSED:              /* Closed, has LISTENER_CLOSE has been processed? */
     if (!pn_collector_peek(l->collector)) {
-      leader_dec(l->work.proactor);
+      remove_active(l->work.proactor);
       closed = true;
     }
   }
@@ -824,8 +828,8 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 /* Return the next event batch or NULL if no events are available */
 static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
   if (!p->batch_working) {       /* Can generate proactor events */
-    if (p->inactive) {
-      p->inactive = false;
+    if (p->need_inactive) {
+      p->need_inactive = false;
       return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
     }
     if (p->need_interrupt) {
@@ -834,6 +838,7 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
     }
     if (p->timeout_state == TM_FIRED) {
       p->timeout_state = TM_NONE;
+      remove_active_lh(p);
       return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
     }
   }
@@ -1114,6 +1119,7 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_state = TM_REQUEST;
+  ++p->active;
   uv_mutex_unlock(&p->lock);
   notify(p);
 }
@@ -1122,6 +1128,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   if (p->timeout_state != TM_NONE) {
     p->timeout_state = TM_NONE;
+    remove_active_lh(p);
     notify(p);
   }
   uv_mutex_unlock(&p->lock);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 3e1598c..f6122c1 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -130,7 +130,6 @@ static pn_event_type_t proactor_test_run(proactor_test_t *pts, size_t n) {
   return e;
 }
 
-
 /* Drain and discard outstanding events from an array of proactors */
 static void proactor_test_drain(proactor_test_t *pts, size_t n) {
   while (proactor_test_get(pts, n))
@@ -467,17 +466,45 @@ static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  /* Default test timeout will interfere with the test, cancel it */
+  pn_proactor_cancel_timeout(client);
+  pn_proactor_cancel_timeout(server);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Listen, connect, disconnect */
   proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
-  /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+  /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  /* server won't be INACTIVE until listener is closed */
+
+  /* Immediate timer generates INACTIVE on client (no connections) */
+  pn_proactor_set_timeout(client, 0);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Connect, set-timer, disconnect */
+  pn_proactor_set_timeout(client, 1000000);
+  c = pn_connection();
+  pn_proactor_connect(client, c, l.port.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  /* Expect TRANSPORT_CLOSED from client and server */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  /* No INACTIVE till timer is cancelled */
+  TEST_CHECK(t, pn_proactor_get(server) == NULL);
+  pn_proactor_cancel_timeout(client);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Server won't be INACTIVE until listener is closed */
   TEST_CHECK(t, pn_proactor_get(server) == NULL);
   pn_listener_close(l.listener);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/5] qpid-proton git commit: PROTON-1504: epoll proactor fix race condition in ENFILE/EMFILE

Posted by ac...@apache.org.
PROTON-1504: epoll proactor fix race condition in ENFILE/EMFILE

If the EMFILE error occured on a timer FD rather than a network socket,
the proactor would crash when trying to re-arm the timer. Fixed to check
for this condition.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fcee8774
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fcee8774
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fcee8774

Branch: refs/heads/master
Commit: fcee87743abdfc626b8ca045dd52da5d3cae9e71
Parents: 8410e2e
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 19 13:10:24 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:14:08 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 6 +++---
 proton-c/src/tests/fdlimit.py | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fcee8774/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index a173086..54703a5 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -158,7 +158,6 @@ typedef struct ptimer_t {
 
 static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
   pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
-  if (pt->timerfd < 0) return false;
   pmutex_init(&pt->mutex);
   pt->timer_active = false;
   pt->in_doubt = false;
@@ -169,7 +168,7 @@ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
   pt->epoll_io.type = type;
   pt->epoll_io.wanted = EPOLLIN;
   pt->epoll_io.polling = false;
-  return true;
+  return (pt->timerfd >= 0);
 }
 
 // Call with ptimer lock held
@@ -695,6 +694,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   if (!pc) return NULL;
   if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+    free(pc);
     return NULL;
   }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
@@ -1008,7 +1008,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     return NULL;
   }
 
-  if (!pc->timer_armed && !pc->timer.shutting_down) {
+  if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) {
     pc->timer_armed = true;  // about to rearm outside the lock
     timer_unarmed = true;    // so we remember
   }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fcee8774/proton-c/src/tests/fdlimit.py
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/fdlimit.py b/proton-c/src/tests/fdlimit.py
index 5f849b3..a31fa09 100644
--- a/proton-c/src/tests/fdlimit.py
+++ b/proton-c/src/tests/fdlimit.py
@@ -34,7 +34,7 @@ class LimitedBroker(object):
     def __exit__(self, *args):
         b = getattr(self, "proc")
         if b:
-            if b.poll() !=  None: # Broker crashed
+            if b.poll() not in [1, None]: # Broker crashed or got expected connection error
                 raise ProcError(b, "broker crash")
             b.kill()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/5] qpid-proton git commit: PROTON-1494: epoll proactor PN_PROACTOR_INACTIVE behavior

Posted by ac...@apache.org.
PROTON-1494: epoll proactor PN_PROACTOR_INACTIVE behavior

Issue PN_PROACTOR_INACTIVE only when there are no connections, listeners OR
timeout set.

Updated the doc.

Updated the tests - dropped default timeout as it interferes with tests that check
for TIMEOUT or INACTIVE events, which is most of them.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ae6efdf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ae6efdf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ae6efdf

Branch: refs/heads/master
Commit: 3ae6efdf6f0a099adb7310e1abc7ae057f537933
Parents: 24092c9
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 16:28:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:34:56 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 80 ++++++++++++++++++++++----------------
 proton-c/src/tests/proactor.c | 15 +++----
 2 files changed, 52 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 54703a5..9e8ea4d 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -376,11 +376,12 @@ struct pn_proactor_t {
   epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
-  bool interrupt;
-  bool inactive;
-  bool timer_expired;
-  bool timer_cancelled;
-  bool timer_armed;
+  // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+  bool need_interrupt;
+  bool need_inactive;
+  bool need_timeout;
+  bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+  bool timer_armed; /* timer is armed in epoll */
   bool shutting_down;
   // wake subsystem
   int eventfd;
@@ -1192,6 +1193,19 @@ static int pgetaddrinfo(const char *host, const char *port, int flags, struct ad
   return getaddrinfo(host, port, &hints, res);
 }
 
+static inline bool is_inactive(pn_proactor_t *p) {
+  return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
+}
+
+/* If inactive set need_inactive and return true if the proactor needs a wakeup */
+static bool wait_if_inactive(pn_proactor_t *p) {
+  if (is_inactive(p)) {
+    p->need_inactive = true;
+    return wake(&p->context);
+  }
+  return false;
+}
+
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = new_pconnection_t(p, c, false, addr);
   assert(pc); // TODO: memory safety
@@ -1201,6 +1215,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
+  bool notify_proactor = false;
 
   if (pc->disconnected) {
     notify = wake(&pc->context);    /* Error during initialization */
@@ -1214,10 +1229,13 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
     } else {
       psocket_gai_error(&pc->psocket, gai_error, "connect to ");
       notify = wake(&pc->context);
+      notify_proactor = wait_if_inactive(p);
     }
   }
+  /* We need to issue INACTIVE on immediate failure */
   unlock(&pc->context.mutex);
   if (notify) wake_notify(&pc->context);
+  if (notify_proactor) wake_notify(&p->context);
 }
 
 static void pconnection_tick(pconnection_t *pc) {
@@ -1638,18 +1656,20 @@ static bool proactor_update_batch(pn_proactor_t *p) {
   if (proactor_has_event(p))
     return true;
 
-  if (p->timer_expired) {
-    p->timer_expired = false;
+  if (p->need_timeout) {
+    p->need_timeout = false;
+    p->timeout_set = false;
     proactor_add_event(p, PN_PROACTOR_TIMEOUT);
+    p->need_inactive = is_inactive(p);
     return true;
   }
-  if (p->interrupt) {
-    p->interrupt = false;
+  if (p->need_interrupt) {
+    p->need_interrupt = false;
     proactor_add_event(p, PN_PROACTOR_INTERRUPT);
     return true;
   }
-  if (p->inactive) {
-    p->inactive = false;
+  if (p->need_inactive) {
+    p->need_inactive = false;
     proactor_add_event(p, PN_PROACTOR_INACTIVE);
     return true;
   }
@@ -1669,11 +1689,12 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t even
   bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0;
   lock(&p->context.mutex);
   if (event == PN_PROACTOR_INTERRUPT) {
-    p->interrupt = true;
+    p->need_interrupt = true;
   } else if (event == PN_PROACTOR_TIMEOUT) {
     p->timer_armed = false;
-    if (timer_fired && !p->timer_cancelled)
-      p->timer_expired = true;
+    if (timer_fired && p->timeout_set) {
+      p->need_timeout = true;
+    }
   } else {
     wake_done(&p->context);
   }
@@ -1708,15 +1729,11 @@ static void proactor_add(pcontext_t *ctx) {
 static bool proactor_remove(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
-  bool notify = false;
   bool can_free = true;
   if (ctx->disconnecting) {
     // No longer on contexts list
     if (--ctx->disconnect_ops == 0) {
-      if (--p->disconnects_pending == 0 && !p->contexts) {
-        p->inactive = true;
-        notify = wake(&p->context);
-      }
+      --p->disconnects_pending;
     }
     else                  // procator_disconnect() still processing
       can_free = false;   // this psocket
@@ -1731,14 +1748,11 @@ static bool proactor_remove(pcontext_t *ctx) {
       if (p->contexts)
         p->contexts->prev = NULL;
     }
-    if (ctx->next)
+    if (ctx->next) {
       ctx->next->prev = ctx->prev;
-
-    if (!p->contexts && !p->disconnects_pending && !p->shutting_down) {
-      p->inactive = true;
-      notify = wake(&p->context);
     }
   }
+  bool notify = wait_if_inactive(p);
   unlock(&p->context.mutex);
   if (notify) wake_notify(&p->context);
   return can_free;
@@ -1846,7 +1860,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     if (proactor_has_event(p))
       notify = wake(&p->context);
     unlock(&p->context.mutex);
-    if (notify) wake_notify(&p->context);
+    if (notify)
+      wake_notify(&p->context);
     if (rearm_timer)
       rearm(p, &p->timer.epoll_io);
     return;
@@ -1864,10 +1879,10 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   bool notify = false;
   lock(&p->context.mutex);
-  p->timer_cancelled = false;
+  p->timeout_set = true;
   if (t == 0) {
     ptimer_set(&p->timer, 0);
-    p->timer_expired = true;
+    p->need_timeout = true;
     notify = wake(&p->context);
   } else {
     ptimer_set(&p->timer, t);
@@ -1878,10 +1893,12 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 
 void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   lock(&p->context.mutex);
-  p->timer_cancelled = true;  // stays cancelled until next set_timeout()
-  p->timer_expired = false;
+  p->timeout_set = false;
+  p->need_timeout = false;
   ptimer_set(&p->timer, 0);
+  bool notify = wait_if_inactive(p);
   unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -1951,10 +1968,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     if (--ctx->disconnect_ops == 0) {
       do_free = true;
       ctx_notify = false;
-      if (--p->disconnects_pending == 0 && !p->contexts) {
-        p->inactive = true;
-        notify = wake(&p->context);
-      }
+      notify = wait_if_inactive(p);
     } else {
       // If initiating the close, wake the pcontext to do the free.
       if (ctx_notify)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index f6122c1..6ae633c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -33,8 +33,6 @@
 #include <stdlib.h>
 #include <string.h>
 
-static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
-
 static const char *localhost = ""; /* host for connect/listen */
 
 typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
@@ -57,7 +55,6 @@ static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
     if (!pt->t) pt->t = t;
     if (!pt->proactor) pt->proactor = pn_proactor();
     pt->log_len = 0;
-    pn_proactor_set_timeout(pt->proactor, timeout);
   }
 }
 
@@ -175,14 +172,17 @@ static void test_interrupt_timeout(test_t *t) {
   /* Set an immediate timeout */
   pn_proactor_set_timeout(p, 0);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because timeout expired */
 
   /* Set a (very short) timeout */
-  pn_proactor_set_timeout(p, 10);
+  pn_proactor_set_timeout(p, 1);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
 
   /* Set and cancel a timeout, make sure we don't get the timeout event */
-  pn_proactor_set_timeout(p, 10);
+  pn_proactor_set_timeout(p, 10000000);
   pn_proactor_cancel_timeout(p);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
   TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
 
   pn_proactor_free(p);
@@ -466,11 +466,6 @@ static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  /* Default test timeout will interfere with the test, cancel it */
-  pn_proactor_cancel_timeout(client);
-  pn_proactor_cancel_timeout(server);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Listen, connect, disconnect */
   proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/5] qpid-proton git commit: PROTON-1494: update C API doc PN_PROACTOR_INACTIVE event.

Posted by ac...@apache.org.
PROTON-1494: update C API doc PN_PROACTOR_INACTIVE event.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/37421c48
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/37421c48
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/37421c48

Branch: refs/heads/master
Commit: 37421c4894ddfc6187d2cae1406b7cbacebcdc0f
Parents: cb0737a
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 16:28:46 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:15:25 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/event.h    | 6 ++++--
 proton-c/include/proton/proactor.h | 9 ++++++---
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37421c48/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 06e9c5f..2db0373 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -334,8 +334,10 @@ typedef enum {
   PN_PROACTOR_TIMEOUT,
 
   /**
-   * The proactor is inactive. All listeners and connections are closed and
-   * their events processed, the timeout is expired.
+   * The proactor has become inactive: all listeners and connections were closed
+   * and the timeout (if set) expired or was cancelled. There will be no
+   * further events unless new listeners or connections are opened, or a new
+   * timeout is set (possibly in other threads in a multi-threaded program.)
    *
    * Events of this type point to the @ref pn_proactor_t.
    */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37421c48/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 4e5151a..8d5ff0e 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -141,9 +141,12 @@ PNP_EXTERN void pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *liste
  * Disconnect all connections and listeners belonging to the proactor.
  *
  * @ref PN_LISTENER_CLOSE, @ref PN_TRANSPORT_CLOSED and other @ref proactor_events are
- * generated as usual.  If no new listeners or connections are created, then a
- * @ref PN_PROACTOR_INACTIVE event will be generated when all connections and
- * listeners are disconnected.
+ * generated as usual.
+ *
+ * If no new listeners or connections are created, then a @ref
+ * PN_PROACTOR_INACTIVE event will be generated when all connections and
+ * listeners are disconnected and no timeout is pending - see
+ * pn_proactor_set_timeout() pn_proactor_cancel_timeout()
  *
  * Note the proactor remains active, connections and listeners created after a call to
  * pn_proactor_disconnect() are not affected by it.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org