You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/06 23:20:30 UTC

[1/4] qpid-proton git commit: PROTON-1422: fix clang compile error from previous commit

Repository: qpid-proton
Updated Branches:
  refs/heads/master 11fe67e30 -> ee29b0918


PROTON-1422: fix clang compile error from previous commit


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6932dae6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6932dae6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6932dae6

Branch: refs/heads/master
Commit: 6932dae68da118b55c0259e97a513e2a3ac974fa
Parents: 11fe67e
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 1 13:56:22 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 1 13:58:32 2017 -0500

----------------------------------------------------------------------
 proton-c/src/core/url-internal.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6932dae6/proton-c/src/core/url-internal.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/url-internal.c b/proton-c/src/core/url-internal.c
index 2cfc177..fed82b4 100644
--- a/proton-c/src/core/url-internal.c
+++ b/proton-c/src/core/url-internal.c
@@ -60,7 +60,7 @@ static void pni_urldecode(const char *src, char *dst)
 void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path)
 {
   if (!url) return;
-  *scheme = *user = *pass = *host = *port = *path = '\0';
+  *scheme = *user = *pass = *host = *port = *path = NULL;
 
   char *slash = strchr(url, '/');
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] qpid-proton git commit: PROTON-1403: apply warning compile flags to proactor sources.

Posted by ac...@apache.org.
PROTON-1403: apply warning compile flags to proactor sources.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/16009579
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/16009579
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/16009579

Branch: refs/heads/master
Commit: 1600957917424967a7cb7dd038fecd98f375e051
Parents: c9ee24a
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Mar 6 17:56:11 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Mar 6 17:56:11 2017 -0500

----------------------------------------------------------------------
 proton-c/CMakeLists.txt | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/16009579/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index a6e9ef1..4f3430e 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -105,17 +105,6 @@ else(PN_WINAPI)
   set (pn_selector_impl src/reactor/io/posix/selector.c)
 endif(PN_WINAPI)
 
-# Select proactor impl
-find_package(Libuv)
-if (Libuv_FOUND)
-  set (qpid-proton-proactor src/proactor/libuv.c)
-  set (PROACTOR_LIBS ${Libuv_LIBRARIES})
-  set_source_files_properties (${qpid-proton-proactor} PROPERTIES
-    # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
-    COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
-  )
-endif()
-
 # Link in SASL if present
 if (SASL_IMPL STREQUAL cyrus)
   set(pn_sasl_impl src/sasl/sasl.c src/sasl/cyrus_sasl.c)
@@ -293,7 +282,6 @@ set(COMPILE_WARNING_FLAGS "${COMPILE_WARNING_FLAGS}" PARENT_SCOPE)
 set(COMPILE_LANGUAGE_FLAGS "${COMPILE_LANGUAGE_FLAGS}" PARENT_SCOPE)
 set(LINK_TIME_OPTIMIZATION "${LTO}" PARENT_SCOPE)
 
-
 if (MSVC)
     set(CMAKE_DEBUG_POSTFIX "d")
     add_definitions(
@@ -482,6 +470,18 @@ set (qpid-proton-include-extra
   include/proton/url.h
 )
 
+# Select proactor sources and build flags
+find_package(Libuv)
+if (Libuv_FOUND)
+  set (qpid-proton-proactor src/proactor/libuv.c)
+  set (PROACTOR_LIBS ${Libuv_LIBRARIES})
+  set_source_files_properties (${qpid-proton-proactor} PROPERTIES
+    # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
+    COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
+  )
+endif()
+
+
 # note: process bindings after the source lists have been defined so
 # the bindings can reference them
 add_subdirectory(bindings)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] qpid-proton git commit: PROTON-4103: libuv.c simplify detach/listen logic, fix a write race

Posted by ac...@apache.org.
PROTON-4103: libuv.c simplify detach/listen logic, fix a write race


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ee29b091
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ee29b091
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ee29b091

Branch: refs/heads/master
Commit: ee29b091829b4aa8bd35428922387b81680d5311
Parents: 1600957
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Mar 6 18:08:59 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Mar 6 18:11:32 2017 -0500

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 205 +++++++++++++++++++------------------
 1 file changed, 105 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ee29b091/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 1e80e68..678b683 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -88,7 +88,6 @@ typedef struct psocket_t {
   /* Protected by proactor.lock */
   struct psocket_t* next;
   bool working;                      /* Owned by a worker thread */
-  void (*action)(struct psocket_t*); /* deferred action for leader */
 
   /* Only used by leader thread when it owns the psocket */
   uv_tcp_t tcp;
@@ -215,7 +214,9 @@ static inline void notify(pn_proactor_t* p) {
 /* Notify that this socket needs attention from the leader at the next opportunity */
 static void psocket_notify(psocket_t *ps) {
   uv_mutex_lock(&ps->proactor->lock);
-  /* Only queue if not working and not already queued */
+  /* If the socket is in use by a worker or is already queued then leave it where it is.
+     It will be processed in pn_proactor_done() or when the queue it is on is processed.
+  */
   if (!ps->working && ps->next == &UNLISTED) {
     push_lh(&ps->proactor->leader_q, ps);
     notify(ps->proactor);
@@ -387,36 +388,11 @@ static int leader_init(psocket_t *ps) {
   return err;
 }
 
-/* Check if a pconnection has work for a worker thread. Called by owning thread. */
-static bool pconnection_needs_work(pconnection_t *pc) {
-  if (!pc->writing) {           /* Can't detach for work while write is pending */
-    /* Check for wake requests */
-    uv_mutex_lock(&pc->lock);
-    bool wake = pc->wake;
-    pc->wake = false;
-    uv_mutex_unlock(&pc->lock);
-    if (wake) {
-      pn_connection_t *c = pc->driver.connection;
-      pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-    }
-    return pn_connection_driver_has_event(&pc->driver);
-  }
-  return false;
-}
-
-/* Detach a connection from the UV loop so it can be used safely by a worker */
-void pconnection_detach(pconnection_t *pc) {
-  uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-  uv_timer_stop(&pc->timer);
-  psocket_notify(&pc->psocket);
-}
-
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  assert(!pc->psocket.working);
   if (err) pconnection_error(pc, err, "on connect to");
-  pconnection_detach(pc);
+  psocket_notify(&pc->psocket);
 }
 
 /* Incoming connection ready to be accepted */
@@ -475,49 +451,62 @@ static void leader_listen(pn_listener_t *l) {
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
   uv_mutex_unlock(&l->lock);
-  if (err) listener_error(l, err, "listening on");
+  if (err) {
+    listener_error(l, err, "listening on");
+  }
 }
 
-static bool listener_needs_work(pn_listener_t *l) {
+static bool listener_has_work(pn_listener_t *l) {
   uv_mutex_lock(&l->lock);
-  bool needs_work = pn_collector_peek(l->collector);
+  bool has_work = pn_collector_peek(l->collector);
   uv_mutex_unlock(&l->lock);
-  return needs_work;
+  return has_work;
 }
 
-static bool listener_finished_lh(pn_listener_t *l) {
-  return l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
+static pconnection_t *listener_pop(pn_listener_t *l) {
+  uv_mutex_lock(&l->lock);
+  pconnection_t *pc = (pconnection_t*)pop_lh(&l->accept);
+  uv_mutex_unlock(&l->lock);
+  return pc;
+}
+
+static bool listener_finished(pn_listener_t *l) {
+  uv_mutex_lock(&l->lock);
+  bool finished = l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
+  uv_mutex_unlock(&l->lock);
+  return finished;
 }
 
-static bool leader_process_listener(pn_listener_t * l) {
+/* Process a listener, return true if it has events for a worker thread */
+static bool leader_process_listener(pn_listener_t *l) {
+  /* NOTE: l may be concurrently accessed by on_connection() */
+
   if (l->psocket.tcp.data == NULL) {
+    /* Start listening if not already listening */
     leader_listen(l);
+  } else if (listener_finished(l)) {
+    /* Close if listener is finished. */
+    uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
+    return false;
   } else {
-    uv_mutex_lock(&l->lock);
+    /* Process accepted connections if any */
     pconnection_t *pc;
-    while (!listener_finished_lh(l) && (pc = (pconnection_t*)pop_lh(&l->accept))) {
-      uv_mutex_unlock(&l->lock);
+    while ((pc = listener_pop(l))) {
       int err = leader_init(&pc->psocket);
-      if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-      if (err) {
+      if (!err) {
+        err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+      } else {
         listener_error(l, err, "accepting from");
-        psocket_notify(&l->psocket);
         pconnection_error(pc, err, "accepting from");
       }
       psocket_start(&pc->psocket);
-      uv_mutex_lock(&l->lock);
     }
-    if (listener_finished_lh(l)) {
-      uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
-    }
-    uv_mutex_unlock(&l->lock);
   }
-  return listener_needs_work(l);
+  return listener_has_work(l);
 }
 
 /* Generate tick events and return millis till next tick or 0 if no tick is required */
 static pn_millis_t leader_tick(pconnection_t *pc) {
-  assert(!pc->psocket.working);
   uint64_t now = uv_now(pc->timer.loop);
   uint64_t next = pn_transport_tick(pc->driver.transport, now);
   return next ? next - now : 0;
@@ -525,14 +514,12 @@ static pn_millis_t leader_tick(pconnection_t *pc) {
 
 static void on_tick(uv_timer_t *timer) {
   pconnection_t *pc = (pconnection_t*)timer->data;
-  assert(!pc->psocket.working);
   leader_tick(pc);
-  pconnection_detach(pc);
+  psocket_notify(&pc->psocket);
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
-  assert(!pc->psocket.working);
   if (nread >= 0) {
     pn_connection_driver_read_done(&pc->driver, nread);
   } else if (nread == UV_EOF) { /* hangup */
@@ -540,12 +527,11 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   } else {
     pconnection_error(pc, nread, "on read from");
   }
-  pconnection_detach(pc);
+  psocket_notify(&pc->psocket);
 }
 
 static void on_write(uv_write_t* write, int err) {
   pconnection_t *pc = (pconnection_t*)write->data;
-  assert(!pc->psocket.working);
   size_t size = pc->writing;
   pc->writing = 0;
   if (err) {
@@ -553,7 +539,7 @@ static void on_write(uv_write_t* write, int err) {
   } else {
     pn_connection_driver_write_done(&pc->driver, size);
   }
-  pconnection_detach(pc);
+  psocket_notify(&pc->psocket);
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -593,7 +579,6 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
 
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
-  assert(l->psocket.working);
   uv_mutex_lock(&l->lock);
   pn_event_t *e = pn_collector_next(l->collector);
   uv_mutex_unlock(&l->lock);
@@ -607,7 +592,6 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 }
 
 static void pn_listener_free(pn_listener_t *l) {
-  /* No  assert(l->psocket.working);  can be called during shutdown */
   if (l) {
     if (l->collector) pn_collector_free(l->collector);
     if (l->condition) pn_condition_free(l->condition);
@@ -644,51 +628,73 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
   return NULL;
 }
 
-/* Process a pconnection, return true if it has work */
+/* Check and reset the wake flag */
+static bool check_wake(pconnection_t *pc) {
+  uv_mutex_lock(&pc->lock);
+  bool wake = pc->wake;
+  pc->wake = false;
+  uv_mutex_unlock(&pc->lock);
+  return wake;
+}
+
+/* Process a pconnection, return true if it has events for a worker thread */
 static bool leader_process_pconnection(pconnection_t *pc) {
+  /* Important to do the following steps in order */
   if (pc->psocket.tcp.data == NULL) {
+    /* Start the connection if not already connected */
     leader_connect(pc);
-  }
-  pn_millis_t next_tick = leader_tick(pc);
-  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-  if (pconnection_needs_work(pc)) {
-    return true;                /* Don't wait on IO while there is pending work */
-  }
-  if (pn_connection_driver_finished(&pc->driver)) {
+  } else if (pn_connection_driver_finished(&pc->driver)) {
+    /* Close if the connection is finished */
     uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
-    return false;
-  }
-  /* Issue async IO requests */
-  int err = 0;
-  const char *what = NULL;
-
-  if (!err && next_tick) {
-    what = "connection timer start";
-    err = uv_timer_start(&pc->timer, on_tick, next_tick, 0);
-  }
-  if (!err && !pc->writing) {
-    what = "write";
-    if (wbuf.size > 0) {
-      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-      err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-      if (!err) {
-        pc->writing = wbuf.size;
+  } else {
+    /* Check for events that can be generated without blocking for IO */
+    if (check_wake(pc)) {
+      pn_connection_t *c = pc->driver.connection;
+      pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+    }
+    pn_millis_t next_tick = leader_tick(pc);
+    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+    /* If we still have no events, make async UV requests */
+    if (!pn_connection_driver_has_event(&pc->driver)) {
+      int err = 0;
+      const char *what = NULL;
+      if (!err && next_tick) {
+        what = "connection timer start";
+        err = uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+      }
+      if (!err && !pc->writing) {
+        what = "write";
+        if (wbuf.size > 0) {
+          uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+          err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+          if (!err) {
+            pc->writing = wbuf.size;
+          }
+        } else if (pn_connection_driver_write_closed(&pc->driver)) {
+          uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+        }
+      }
+      if (!err && rbuf.size > 0) {
+        what = "read";
+        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+      }
+      if (err) {
+        /* Some IO requests failed, generate the error events */
+        pconnection_error(pc, err, what);
       }
-    } else if (pn_connection_driver_write_closed(&pc->driver)) {
-      uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
     }
   }
-  if (!err && rbuf.size > 0) {
-    what = "read";
-    err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
-  }
-  if (err) {
-    pconnection_detach(pc);
-    pconnection_error(pc, err, what);
-    return true;
+  return pn_connection_driver_has_event(&pc->driver);
+}
+
+/* Detach a connection from the UV loop so it can be used safely by a worker */
+void pconnection_detach(pconnection_t *pc) {
+  if (!pc->writing) {           /* Can't detach while a write is pending */
+    uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+    uv_timer_stop(&pc->timer);
   }
-  return false;
 }
 
 /* Process the leader_q and the UV loop, in the leader thread */
@@ -698,12 +704,15 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
     assert(!ps->working);
 
     uv_mutex_unlock(&p->lock);  /* Unlock to process each item, may add more items to leader_q */
-    bool needs_work = ps->is_conn ?
+    bool has_work = ps->is_conn ?
       leader_process_pconnection(as_pconnection(ps)) :
       leader_process_listener(as_listener(ps));
     uv_mutex_lock(&p->lock);
 
-    if (needs_work && !ps->working && ps->next == &UNLISTED) {
+    if (has_work && !ps->working && ps->next == &UNLISTED) {
+      if (ps->is_conn) {
+        pconnection_detach(as_pconnection(ps));
+      }
       ps->working = true;
       push_lh(&p->worker_q, ps);
     }
@@ -869,9 +878,6 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
   pn_collector_free(p->collector);
-  /* FIXME aconway 2017-02-25: memory leaks, need to clean up the queues */
-  /* assert(!p->worker_q.front); */
-  /* assert(!p->leader_q.front); */
   free(p);
 }
 
@@ -938,9 +944,8 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
-  assert(l->psocket.working);
-  assert(!l->closed);
   uv_mutex_lock(&l->lock);
+  assert(!l->closed);
   pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
   if (!pc) {
     return PN_OUT_OF_MEMORY;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] qpid-proton git commit: PROTON-1403: minor example and documentation fixes

Posted by ac...@apache.org.
PROTON-1403: minor example and documentation fixes


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c9ee24a4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c9ee24a4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c9ee24a4

Branch: refs/heads/master
Commit: c9ee24a42a842cb24a67c042cac28f73e72d4dba
Parents: 6932dae
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Mar 6 17:55:38 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Mar 6 17:55:38 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/README.dox     | 2 --
 examples/c/proactor/broker.c       | 3 ++-
 proton-c/include/proton/proactor.h | 5 ++++-
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9ee24a4/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
index 19083e5..3950a9a 100644
--- a/examples/c/proactor/README.dox
+++ b/examples/c/proactor/README.dox
@@ -18,6 +18,4 @@
  * @example broker.c
  *
  * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
- *
- * __Requires C++11__
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9ee24a4/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 302bdec..d322ad0 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -252,7 +252,8 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
+    pn_connection_t *c = pn_event_connection(e);
+    if (c) pn_connection_close(c); /* It might be a listener event */
     exit_code = 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9ee24a4/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 9a8a0ad..43b8ccb 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -153,11 +153,14 @@ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
  * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait()
  * after timeout milliseconds. Thread-safe.
  *
- * Note that calling pn_proactor_set_timeout() again before the
+ * Note: calling pn_proactor_set_timeout() again before the
  * PN_PROACTOR_TIMEOUT is delivered will cancel the previous timeout
  * and deliver an event only after the new
  * timeout. `pn_proactor_set_timeout(0)` will cancel the timeout
  * without setting a new one.
+ *
+ * Note: PN_PROACTOR_TIMEOUT events will be delivered in series, never
+ * concurrently.
  */
 PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org