You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/08 16:55:05 UTC

[1/4] qpid-proton git commit: PROTON-1495: c epoll use strerror_r() instead of unsafe strerr()

Repository: qpid-proton
Updated Branches:
  refs/heads/master cb5996ee5 -> 087b94fae


PROTON-1495: c epoll use strerror_r() instead of unsafe strerr()


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8ef841c5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8ef841c5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8ef841c5

Branch: refs/heads/master
Commit: 8ef841c53dab1762fdbe1bc7058cc290f3a4f439
Parents: cb5996e
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Jun 7 13:22:13 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 13:22:13 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c             | 39 +++++++++++++++++---------
 proton-c/src/proactor/libuv.c             |  6 ++--
 proton-c/src/proactor/proactor-internal.c |  5 +++-
 3 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8ef841c5/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index b6d2bd0..0bb0b10 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -20,12 +20,14 @@
  */
 
 /* Enable POSIX features for pthread.h */
-#ifndef _GNU_SOURCE
-#define _GNU_SOURCE
+#ifndef _DEFAULT_SOURCE
+#define _DEFAULT_SOURCE
 #endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
 
 #include "../core/log_private.h"
-#include "proactor-internal.h"
+#include "./proactor-internal.h"
 
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
@@ -66,12 +68,23 @@
 // looking for pending wakes before a kernel call to epoll_wait(), or there
 // could be several eventfds with random assignment of wakeables.
 
+
+typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
+
+/* Like strerror_r but provide a default message if strerror_r fails */
+static void pstrerror(int err, strerrorbuf msg) {
+  int e = strerror_r(err, msg, sizeof(strerrorbuf));
+  if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
+}
+
 /* Internal error, no recovery */
-#define EPOLL_FATAL(EXPR, SYSERRNO)                                   \
-  do {                                                                \
-    fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n",      \
-            __FILE__, __LINE__ , #EXPR, strerror(SYSERRNO));          \
-    abort();                                                          \
+#define EPOLL_FATAL(EXPR, SYSERRNO)                                     \
+  do {                                                                  \
+    strerrorbuf msg;                                                    \
+    pstrerror((SYSERRNO), msg);                                         \
+    fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n",        \
+            __FILE__, __LINE__ , #EXPR, msg);                           \
+    abort();                                                            \
   } while (0)
 
 // ========================================================================
@@ -580,18 +593,16 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
 }
 
 static void psocket_error(psocket_t *ps, int err, const char* what) {
+  strerrorbuf msg;
+  pstrerror(err, msg);
   if (!ps->listener) {
     pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pn_connection_driver_errorf(driver, PNI_IO_CONDITION, "%s %s:%s: %s",
-                                what, ps->host, ps->port,
-                                strerror(err));
+    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
     pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = psocket_listener(ps);
-    pn_condition_format(l->condition, PNI_IO_CONDITION, "%s %s:%s: %s",
-                        what, ps->host, ps->port,
-                        strerror(err));
+    pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
     listener_begin_close(l);
   }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8ef841c5/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index a851c4e..cf3daab 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -455,10 +455,8 @@ static inline void pconnection_bad_connect(pconnection_t *pc, int err) {
 static void pconnection_set_error(pconnection_t *pc, int err, const char* what) {
   pn_connection_driver_t *driver = &pc->driver;
   pn_connection_driver_bind(driver); /* Make sure we are bound so errors will be reported */
-  if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
-    pni_proactor_set_cond(pn_transport_condition(driver->transport),
-                                what, pc->addr.host , pc->addr.port, uv_strerror(err));
-  }
+  pni_proactor_set_cond(pn_transport_condition(driver->transport),
+                        what, pc->addr.host , pc->addr.port, uv_strerror(err));
 }
 
 /* Set the error condition and close the driver. */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8ef841c5/proton-c/src/proactor/proactor-internal.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/proactor-internal.c b/proton-c/src/proactor/proactor-internal.c
index 3525c71..0c82573 100644
--- a/proton-c/src/proactor/proactor-internal.c
+++ b/proton-c/src/proactor/proactor-internal.c
@@ -72,5 +72,8 @@ static inline const char *nonull(const char *str) { return str ? str : ""; }
 void pni_proactor_set_cond(
   pn_condition_t *cond, const char *what, const char *host, const char *port, const char *msg)
 {
-  pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s:%s", msg, what, nonull(host), nonull(port));
+  if (!pn_condition_is_set(cond)) { /* Preserve older error information */
+    pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s:%s",
+                        msg, what, nonull(host), nonull(port));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] qpid-proton git commit: NO-JIRA: Updated old FIXME comments, fixed racy test.

Posted by ac...@apache.org.
NO-JIRA: Updated old FIXME comments, fixed racy test.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/11fa24db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/11fa24db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/11fa24db

Branch: refs/heads/master
Commit: 11fa24dbfbd230c89efe05384aebbb086d645df8
Parents: 8c5a031
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 8 11:29:51 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 8 12:47:10 2017 -0400

----------------------------------------------------------------------
 examples/cpp/mt/epoll_container.cpp          |  6 +++---
 proton-c/bindings/cpp/src/container_impl.cpp |  2 +-
 proton-c/src/proactor/epoll.c                | 16 ++++++++++++----
 proton-c/src/proactor/libuv.c                |  2 +-
 proton-c/src/tests/proactor.c                |  3 ---
 5 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/11fa24db/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
index 7646673..5643fcc 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -140,9 +140,9 @@ class epoll_container : public proton::io::container_impl_base {
         std::atomic<int> count_;
     };
 
-     // FIXME aconway 2016-06-07: Unfinished
-    void schedule(proton::duration, std::function<void()>) OVERRIDE { throw std::logic_error("FIXME"); }
-    void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw std::logic_error("FIXME"); }
+     // TODO aconway 2016-06-07: Unfinished
+    void schedule(proton::duration, std::function<void()>) OVERRIDE { throw std::logic_error("not implemented"); }
+    void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw std::logic_error("not implemented"); }
     atomic_link_namer link_namer;
 
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/11fa24db/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index d58fc09..e0851a9 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -161,7 +161,7 @@ container::impl::~impl() {
         close_acceptor(i->second);
 }
 
-// FIXME aconway 2016-06-07: this is not thread safe. It is sufficient for using
+// TODO aconway 2016-06-07: this is not thread safe. It is sufficient for using
 // default_container::schedule() inside a handler but not for inject() from
 // another thread.
 bool event_loop::impl::inject(void_function0& f) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/11fa24db/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 69954d6..a0f6519 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -19,9 +19,9 @@
  *
  */
 
-/* Enable POSIX features for pthread.h */
-#ifndef _DEFAULT_SOURCE
-#define _DEFAULT_SOURCE
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
 #endif
 /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
 #undef _GNU_SOURCE
@@ -1251,7 +1251,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
     }
     assert(len > 0);            /* guaranteed by getaddrinfo */
     l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
-    assert(l->psockets);      /* FIXME aconway 2017-05-05: memory safety */
+    assert(l->psockets);      /* TODO aconway 2017-05-05: memory safety */
     l->psockets_size = 0;
     /* Find working listen addresses */
     for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
@@ -1941,6 +1941,14 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) {
   return pc ? &pc->remote : NULL;
 }
 
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
 int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
   char host[NI_MAXHOST];
   char port[NI_MAXSERV];

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/11fa24db/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 146cd7a..7460194 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -1129,7 +1129,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
 
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = pconnection(p, c, false);
-  assert(pc);                                  /* FIXME aconway 2017-03-31: memory safety */
+  assert(pc);                                  /* TODO aconway 2017-03-31: memory safety */
   pn_connection_open(pc->driver.connection);   /* Auto-open */
   parse_addr(&pc->addr, addr);
   work_start(&pc->work);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/11fa24db/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 2764f79..c41110b 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -691,9 +691,6 @@ static void test_ssl(test_t *t) {
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_COND_EMPTY(t, last_condition);
 
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-
   PROACTOR_TEST_FREE(pts);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] qpid-proton git commit: PROTON-1495: c epoll proactor handling file descriptor shortage

Posted by ac...@apache.org.
PROTON-1495: c epoll proactor handling file descriptor shortage

Previously if the proactor ran out of file descriptors during pn_listener_accept the
listener closed, and could not accept further connections even when FDs became
free.

Now the affected listener(s) are put into "overflow" mode and dispabled until
the proactor closes an FD, at which point it re-enables overflowing listeners.

The error is reported as a PN_TRANSPORT_ERROR on the connection that the
application attempted to use for pn_listener_accept() Clients will experience a
delay and/or be disconnected while the listener is in overflow mode.

NOTE: we should also have a timeout to re-try overflow listeners even if the
proactor has not closed any file descriptors, in case other parts of the
application have. This is not yet done.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/087b94fa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/087b94fa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/087b94fa

Branch: refs/heads/master
Commit: 087b94faef6f0bc3fc2eac50ba0d2c2b03d82fbd
Parents: 11fa24d
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Jun 5 14:14:01 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 8 12:52:53 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                    |  10 +++
 examples/CMakeLists.txt           |   9 --
 examples/c/proactor/broker.c      |  10 +--
 proton-c/include/proton/event.h   |   1 -
 proton-c/src/proactor/epoll.c     | 160 ++++++++++++++++++++++-----------
 proton-c/src/tests/CMakeLists.txt |  12 +++
 proton-c/src/tests/fdlimit.py     |  85 ++++++++++++++++++
 7 files changed, 219 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 294fd03..7fe25e3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -170,6 +170,16 @@ if (ENABLE_VALGRIND)
   endif ()
 endif (ENABLE_VALGRIND)
 
+# Set result to a native search path - used by examples and binding tests.
+# args after result are directories or search paths.
+macro(set_search_path result)
+  set(${result} ${ARGN})
+  if (UNIX)
+    string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators.
+  endif()
+  file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
+endmacro()
+
 add_subdirectory(proton-c)
 add_subdirectory(examples)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 8a8327a..160647d 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -20,15 +20,6 @@
 set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 set (ProtonCpp_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 
-# Set result to a native search path
-macro(set_search_path result)  # args after result are directories or search paths.
-  set(${result} ${ARGN})
-  if (UNIX)
-    string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators.
-  endif()
-  file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
-endmacro()
-
 # Add the tools directory for the 'proctest' module
 set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")
 set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index d9285db..e0d9672 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -251,9 +251,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_t *c = pn_event_connection(e);
-    if (c) pn_connection_close(c); /* It might be a listener event */
-    exit_code = 1;
+    exit_code = 1;              /* Remeber there was an unexpected error */
   }
 }
 
@@ -282,6 +280,7 @@ static void handle(broker_t* b, pn_event_t* e) {
      pn_transport_t *t = pn_connection_transport(c);
      pn_transport_require_auth(t, false);
      pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+     break;
    }
    case PN_CONNECTION_REMOTE_OPEN: {
      pn_connection_open(pn_event_connection(e)); /* Complete the open */
@@ -337,13 +336,12 @@ static void handle(broker_t* b, pn_event_t* e) {
    }
 
    case PN_TRANSPORT_CLOSED:
-    connection_unsub(b, pn_event_connection(e));
     check_condition(e, pn_transport_condition(pn_event_transport(e)));
+    connection_unsub(b, pn_event_connection(e));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
     check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
-    connection_unsub(b, pn_event_connection(e));
     pn_connection_close(pn_event_connection(e));
     break;
 
@@ -405,7 +403,7 @@ int main(int argc, char **argv) {
   const char *host = (argc > i) ? argv[i++] : "";
   const char *port = (argc > i) ? argv[i++] : "amqp";
 
-  /* Listenf on addr */
+  /* Listen on addr */
   char addr[PN_MAX_ADDR];
   pn_proactor_addr(addr, sizeof(addr), host, port);
   pn_proactor_listen(b.proactor, pn_listener(), addr, 16);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 1e28a23..06e9c5f 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -346,7 +346,6 @@ typedef enum {
    * Events of this type point to the @ref pn_listener_t.
    */
   PN_LISTENER_OPEN
-
 } pn_event_type_t;
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index a0f6519..1f10ca0 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -391,6 +391,9 @@ struct pn_proactor_t {
   pcontext_t *wake_list_last;
   // Interrupts have a dedicated eventfd because they must be async-signal safe.
   int interruptfd;
+  // If the process runs out of file descriptors, disarm listeners temporarily and save them here.
+  pn_listener_t *overflow;
+  pmutex overflow_mutex;
 };
 
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
@@ -528,6 +531,7 @@ struct pn_listener_t {
   psocket_t *acceptable, *accepted;
   bool close_dispatched;
   bool armed;
+  pn_listener_t *overflow;       /* Next overflowed listener */
 };
 
 
@@ -623,6 +627,48 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
+// Add an overflowing listener to the overflow list. Called with listener context lock held.
+static void listener_set_overflow(pn_listener_t *l) {
+  pn_proactor_t *p = l->psockets[0].proactor;
+  lock(&p->overflow_mutex);
+  l->overflow = p->overflow;
+  p->overflow = l;
+  unlock(&p->overflow_mutex);
+}
+
+static const int dummy__ = 0;
+static pn_listener_t * const NO_OVERFLOW = (pn_listener_t*)&dummy__; /* Bogus pointer */
+
+/* TODO aconway 2017-06-08: we should also call proactor_rearm_overflow after a fixed delay,
+   even if the proactor has not freed any file descriptors, since other parts of the process
+   might have*/
+
+// Activate overflowing listeners, called when there may be available file descriptors.
+static void proactor_rearm_overflow(pn_proactor_t *p) {
+  lock(&p->overflow_mutex);
+  pn_listener_t *l = p->overflow;
+  p->overflow = NULL;
+  unlock(&p->overflow_mutex);
+  while (l) {
+    lock(&l->context.mutex);
+    rearm(l->accepted->proactor, &l->accepted->epoll_io);
+    l->armed = true;
+    l->accepted = NULL;
+    pn_listener_t *next = l->overflow;
+    l->overflow = NO_OVERFLOW;
+    unlock(&l->context.mutex);
+    l = next;
+  }
+}
+
+// Close an FD and rearm overflow listeners
+static int pclosefd(pn_proactor_t *p, int fd) {
+  int err = close(fd);
+  if (!err) proactor_rearm_overflow(p);
+  return err;
+}
+
+
 // ========================================================================
 // pconnection
 // ========================================================================
@@ -639,10 +685,8 @@ static void pconnection_finalize(void *vp_pconnection) {
   pcontext_finalize(&pc->context);
 }
 
-
 static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
-
 static void pconnection_tick(pconnection_t *pc);
 
 static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
@@ -652,10 +696,6 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
-  if (!ptimer_init(&pc->timer, &pc->psocket)) {
-    perror("timer setup failure");
-    abort();
-  }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
   pc->new_events = 0;
@@ -679,6 +719,12 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   pn_record_t *r = pn_connection_attachments(pc->driver.connection);
   pn_record_def(r, PN_PROACTOR, &pconnection_class);
   pn_record_set(r, PN_PROACTOR, pc);
+
+  if (!ptimer_init(&pc->timer, &pc->psocket)) {
+    psocket_error(&pc->psocket, errno, "timer setup");
+    pc->disconnected = true;    /* Already failed */
+  }
+
   pn_decref(pc);                /* Will be deleted when the connection is */
   return pc;
 }
@@ -704,7 +750,7 @@ static void pconnection_final_free(pconnection_t *pc) {
 static void pconnection_cleanup(pconnection_t *pc) {
   stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
   if (pc->psocket.sockfd != -1)
-    close(pc->psocket.sockfd);
+    pclosefd(pc->psocket.proactor, pc->psocket.sockfd);
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
   ptimer_finalize(&pc->timer);
   lock(&pc->context.mutex);
@@ -753,8 +799,9 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
   if (!e && pc->hog_count < HOG_MAX) {
-    pconnection_process(pc, 0, false, true);  // top up
-    e = pn_connection_driver_next_event(&pc->driver);
+    if (pconnection_process(pc, 0, false, true)) {
+      e = pn_connection_driver_next_event(&pc->driver);
+    }
   }
   return e;
 }
@@ -776,7 +823,7 @@ static inline bool pconnection_wclosed(pconnection_t  *pc) {
    rearm(EPOLLHUP | EPOLLERR | EPOLLONESHOT) and leaves doubt that the
    EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
- */
+*/
 static bool pconnection_rearm_check(pconnection_t *pc) {
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
     return false;
@@ -945,9 +992,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
       else
         pconnection_connected_lh(pc); /* Non error event means we are connected */
       if (pc->new_events & EPOLLOUT)
-          pc->write_blocked = false;
+        pc->write_blocked = false;
       if (pc->new_events & EPOLLIN)
-          pc->read_blocked = false;
+        pc->read_blocked = false;
     }
     pc->current_arm = 0;
     pc->new_events = 0;
@@ -1153,14 +1200,20 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
-  int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo);
-  if (!gai_error) {
-    pc->ai = pc->addrinfo;
-    pconnection_maybe_connect_lh(pc); /* Start connection attempts */
-    notify = pc->disconnected;
+
+  if (pc->disconnected) {
+    notify = wake(&pc->context);    /* Error during initialization */
   } else {
-    psocket_gai_error(&pc->psocket, gai_error, "connect to ");
-    notify = wake(&pc->context);
+    int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo);
+    if (!gai_error) {
+      pn_connection_open(pc->driver.connection); /* Auto-open */
+      pc->ai = pc->addrinfo;
+      pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+      notify = pc->disconnected;
+    } else {
+      psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+      notify = wake(&pc->context);
+    }
   }
   unlock(&pc->context.mutex);
   if (notify) wake_notify(&pc->context);
@@ -1236,6 +1289,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   lock(&l->context.mutex);
   l->context.proactor = p;;
   l->backlog = backlog;
+  l->overflow = NO_OVERFLOW;
 
   char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
@@ -1336,7 +1390,7 @@ static void listener_begin_close(pn_listener_t* l) {
       psocket_t *ps = &l->psockets[i];
       if (ps->sockfd >= 0) {
         stop_polling(&ps->epoll_io, ps->proactor->epollfd);
-        close(ps->sockfd);
+        pclosefd(l->psockets[0].proactor, ps->sockfd);
       }
     }
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
@@ -1415,17 +1469,15 @@ static void listener_done(pn_listener_t *l) {
       pn_listener_free(l);
       return;
     }
-  } else {
-    if (listener_has_event(l))
-      notify = wake(&l->context);
-    else {
-      /* Don't rearm until the current socket is accepted */
-      if (!l->context.closing && !l->armed && !l->acceptable && l->accepted) {
-        rearm(l->accepted->proactor, &l->accepted->epoll_io);
-        l->armed = true;
-        l->accepted = NULL;
-      }
-    }
+  } else if (listener_has_event(l)) {
+    notify = wake(&l->context);
+  } else if (l->overflow == NO_OVERFLOW &&
+             !l->context.closing && !l->armed && !l->acceptable && l->accepted)
+  {
+    /* Don't rearm until the current socket is accepted */
+    rearm(l->accepted->proactor, &l->accepted->epoll_io);
+    l->armed = true;
+    l->accepted = NULL;
   }
   unlock(&l->context.mutex);
   if (notify) wake_notify(&l->context);
@@ -1456,37 +1508,41 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
   assert(pc);  // TODO: memory safety
   int err = 0;
+  int newfd = -1;
+  bool need_done = false;
 
   lock(&l->context.mutex);
   proactor_add(&pc->context);
-  if (l->context.closing)
+  if (l->context.closing) {
     err = EBADF;
-  else if (l->acceptable == 0) {
-      err = EAGAIN;
+  } else if (l->acceptable == 0) {
+    err = EAGAIN;
+  } else {
+    l->accepted = l->acceptable;
+    l->acceptable = 0;
+    newfd = accept(l->accepted->sockfd, NULL, 0);
+    if (newfd < 0) err = errno;
   }
-
   if (err) {
-    psocket_error(&l->psockets[0], errno, "listener accepting from");
-    unlock(&l->context.mutex);
-    return;
-  }
-  psocket_t *ps = l->accepted = l->acceptable;
-  l->acceptable = 0;
-
-  int newfd = accept(ps->sockfd, NULL, 0);
-  if (newfd < 0) {
-    err = errno;
-    psocket_error(&pc->psocket, err, "accepting from");
-    psocket_error(ps, err, "accepting from");
-  } else {
+    lock(&pc->context.mutex);
+    psocket_error(&pc->psocket, err, "accepting from"); /* Always signal error on the connection */
+    pconnection_begin_close(pc);
+    need_done = true;
+    unlock(&pc->context.mutex);
+    if (err == EMFILE || err == ENFILE) { /* Out of FDs does not close the listener */
+      listener_set_overflow(l);
+    } else {
+      psocket_error(l->accepted, err, "accepting from");
+    }
+  } else {                      /* No errors */
     lock(&pc->context.mutex);
     configure_socket(newfd);
     pc->psocket.sockfd = newfd;
     pconnection_start(pc);
     unlock(&pc->context.mutex);
   }
-
   unlock(&l->context.mutex);
+  if (need_done) pconnection_done(pc);
 }
 
 
@@ -1964,7 +2020,7 @@ int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) {
 }
 
 pn_millis_t pn_proactor_now(void) {
-    struct timespec t;
-    clock_gettime(CLOCK_MONOTONIC, &t);
-    return t.tv_sec*1000 + t.tv_nsec/1000000;
+  struct timespec t;
+  clock_gettime(CLOCK_MONOTONIC, &t);
+  return t.tv_sec*1000 + t.tv_nsec/1000000;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 14f353f..f2edef9 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -53,6 +53,18 @@ pn_add_c_test (c-reactor-tests reactor.c)
 pn_add_c_test (c-event-tests event.c)
 pn_add_c_test (c-data-tests data.c)
 pn_add_c_test (c-condition-tests condition.c)
+
 if(HAS_PROACTOR)
   pn_add_c_test (c-proactor-tests proactor.c)
+
+  if(WIN32)
+    set(path "$<TARGET_FILE_DIR:proactor-broker>;$<TARGET_FILE_DIR:qpid-proton>")
+  else(WIN32)
+    set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c/proactor:$ENV{PATH}")
+  endif(WIN32)
+  # Add the tools directory for the 'proctest' module
+  set_search_path(pypath "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")
+
+  add_test(NAME c-fdlimit-tests COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/fdlimit.py)
+  set_tests_properties(c-fdlimit-tests PROPERTIES ENVIRONMENT "PATH=${path};PYTHONPATH=${pypath}")
 endif(HAS_PROACTOR)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/087b94fa/proton-c/src/tests/fdlimit.py
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/fdlimit.py b/proton-c/src/tests/fdlimit.py
new file mode 100644
index 0000000..28fdecf
--- /dev/null
+++ b/proton-c/src/tests/fdlimit.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+from proctest import *
+
+class LimitedBroker(object):
+    def __init__(self, test, fdlimit):
+        self.test = test
+        self.fdlimit = fdlimit
+
+    def __enter__(self):
+        with TestPort() as tp:
+            self.port = str(tp.port)
+            self.proc = self.test.proc(['prlimit', '-n%d' % self.fdlimit, 'broker', '', self.port],
+                                       skip_valgrind=True)
+            self.proc.wait_re("listening")
+            return self
+
+    def __exit__(self, *args):
+        b = getattr(self, "proc")
+        if b:
+            if b.poll() !=  None: # Broker crashed
+                raise ProcError(b, "broker crash")
+            b.kill()
+
+# Check if we can run prlimit to control resources
+try:
+    Proc(["prlimit"]).wait_exit()
+    has_prlimit = True
+except:
+    has_prlimit = False
+
+class FdLimitTest(ProcTestCase):
+
+    def setUp(self):
+        global has_prlimit
+        if not has_prlimit:
+            self.skipTest("prlimit not available")
+        super(FdLimitTest, self).setUp()
+
+    def test_fd_limit_broker(self):
+        """Check behaviour when running out of file descriptors on accept"""
+        # Not too many FDs but not too few either, some are used for system purposes.
+        fdlimit = 256
+        with LimitedBroker(self, fdlimit) as b:
+            receivers = []
+            # Start enough receivers to use all FDs, make sure the broker logs an error
+            for i in xrange(fdlimit+1):
+                receivers.append(self.proc(["receive", "", b.port, str(i)]))
+
+            # Note: libuv silently swallows EMFILE/ENFILE errors so there is no error reporting.
+            # The epoll proactor will close the users connection with the EMFILE/ENFILE error
+            if "TRANSPORT_CLOSED" in b.proc.out:
+                self.assertIn("open files", b.proc.out)
+
+            # All FDs are now in use, send attempt should fail or hang
+            self.assertIn(self.proc(["send", "", b.port, "x"]).poll(), [1, None])
+
+            # Kill receivers to free up FDs
+            for r in receivers:
+                r.kill()
+            for r in receivers:
+                r.wait_exit(expect=None)
+            # send/receive should succeed now
+            self.assertIn("10 messages sent", self.proc(["send", "", b.port]).wait_exit())
+            self.assertIn("10 messages received", self.proc(["receive", "", b.port]).wait_exit())
+
+if __name__ == "__main__":
+    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] qpid-proton git commit: PROTON-1495: c epoll use gai_strerror for getaddrinfo errors.

Posted by ac...@apache.org.
PROTON-1495: c epoll use gai_strerror for getaddrinfo errors.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8c5a031c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8c5a031c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8c5a031c

Branch: refs/heads/master
Commit: 8c5a031c23cac50b406a9d7f40b45ce44bd68a5d
Parents: 8ef841c
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Jun 7 13:41:26 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 7 13:44:48 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 38 +++++++++++++++++++++++++-------------
 proton-c/src/proactor/libuv.c |  2 +-
 proton-c/src/tests/proactor.c | 16 +++++++++++++++-
 3 files changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8c5a031c/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 0bb0b10..69954d6 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -592,9 +592,7 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
   return e;
 }
 
-static void psocket_error(psocket_t *ps, int err, const char* what) {
-  strerrorbuf msg;
-  pstrerror(err, msg);
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
   if (!ps->listener) {
     pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
@@ -607,6 +605,16 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
   }
 }
 
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+  strerrorbuf msg;
+  pstrerror(err, msg);
+  psocket_error_str(ps, msg, what);
+}
+
+static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
+  psocket_error_str(ps, gai_strerror(gai_err), what);
+}
+
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
   struct epoll_event ev;
   ev.data.ptr = ee;
@@ -991,7 +999,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
       else if (errno == EWOULDBLOCK)
         pc->read_blocked = true;
       else if (!(errno == EAGAIN || errno == EINTR)) {
-        psocket_error(&pc->psocket, errno, pc->disconnected ? "Disconnected" : "on read from");
+        psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on read from");
       }
     }
   }
@@ -1145,12 +1153,13 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
-  if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
+  int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo);
+  if (!gai_error) {
     pc->ai = pc->addrinfo;
     pconnection_maybe_connect_lh(pc); /* Start connection attempts */
     notify = pc->disconnected;
   } else {
-    psocket_error(&pc->psocket, errno, "connect to ");
+    psocket_gai_error(&pc->psocket, gai_error, "connect to ");
     notify = wake(&pc->context);
   }
   unlock(&pc->context.mutex);
@@ -1233,7 +1242,8 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port);
 
   struct addrinfo *addrinfo = NULL;
-  if (!pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo)) {
+  int gai_err = pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo);
+  if (!gai_err) {
     /* Count addresses, allocate enough space for sockets */
     size_t len = 0;
     for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
@@ -1273,10 +1283,13 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   bool notify = wake(&l->context);
 
   if (l->psockets_size == 0) { /* All failed, create dummy socket with an error */
-    int err = errno;
     l->psockets = (psocket_t*)calloc(sizeof(psocket_t), 1);
     psocket_init(l->psockets, p, l, addr);
-    psocket_error(l->psockets, err, "listen on");
+    if (gai_err) {
+      psocket_gai_error(l->psockets, gai_err, "listen on");
+    } else {
+      psocket_error(l->psockets, errno, "listen on");
+    }
   }
   proactor_add(&l->context);
   unlock(&l->context.mutex);
@@ -1453,8 +1466,7 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   }
 
   if (err) {
-    /* Error on one socket closes the entire listener */
-    psocket_error(&l->psockets[0], errno, "listener state on accept");
+    psocket_error(&l->psockets[0], errno, "listener accepting from");
     unlock(&l->context.mutex);
     return;
   }
@@ -1464,8 +1476,8 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   int newfd = accept(ps->sockfd, NULL, 0);
   if (newfd < 0) {
     err = errno;
-    psocket_error(&pc->psocket, err, "failed initialization on accept");
-    psocket_error(ps, err, "accept");
+    psocket_error(&pc->psocket, err, "accepting from");
+    psocket_error(ps, err, "accepting from");
   } else {
     lock(&pc->context.mutex);
     configure_socket(newfd);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8c5a031c/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index cf3daab..146cd7a 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -601,7 +601,7 @@ static bool leader_connect(pconnection_t *pc) {
   int err = pconnection_init(pc);
   if (!err) err = leader_resolve(pc->work.proactor, &pc->addr, false);
   if (err) {
-    pconnection_error(pc, err, "connect resolving");
+    pconnection_error(pc, err, "on connect resolving");
     return true;
   } else {
     try_connect(pc);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8c5a031c/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 19cc7f3..2764f79 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -493,7 +493,7 @@ static void test_errors(test_t *t) {
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);          /* Hold a port */
 
-  /* Invalid connect/listen parameters */
+  /* Invalid connect/listen service name */
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
@@ -507,6 +507,20 @@ static void test_errors(test_t *t) {
   TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
+  /* Invalid connect/listen host name */
+  c = pn_connection();
+  pn_proactor_connect(client, c, "nosuch.example.com:");
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_STR_IN(t, "nosuch", pn_condition_get_description(last_condition));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  l = pn_listener();
+  pn_proactor_listen(server, l, "nosuch.example.com:", 1);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_STR_IN(t, "nosuch", pn_condition_get_description(last_condition));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
   /* Connect with no listener */
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org