You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/20 14:26:47 UTC

qpid-proton git commit: added INIT/UPDATED/FINAL events for selectables

Repository: qpid-proton
Updated Branches:
  refs/heads/master 5a10b7803 -> 4283872aa


added INIT/UPDATED/FINAL events for selectables


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4283872a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4283872a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4283872a

Branch: refs/heads/master
Commit: 4283872aafaa589c50c11c766707651e2667af31
Parents: 5a10b78
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Mon Jan 19 22:18:29 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Tue Jan 20 08:25:52 2015 -0500

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py     |   3 +
 proton-c/bindings/python/proton/reactors.py     |   7 +-
 proton-c/include/proton/event.h                 |  15 ++-
 proton-c/include/proton/reactor.h               |   2 +-
 proton-c/include/proton/selectable.h            |   5 +-
 proton-c/src/events/event.c                     |  22 +++-
 proton-c/src/reactor/acceptor.c                 |  18 ++-
 proton-c/src/reactor/reactor.c                  | 116 +++++++++++--------
 proton-c/src/selectable.c                       |   2 +
 proton-c/src/tests/reactor.c                    |  38 ++++--
 .../org/apache/qpid/proton/engine/Event.java    |   5 +-
 proton-j/src/main/resources/cengine.py          |   3 +
 12 files changed, 158 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 936d61d..0516b65 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -3354,9 +3354,12 @@ class Event(Wrapper, EventBase):
   TRANSPORT_TAIL_CLOSED = EventType(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
   TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed")
 
+  SELECTABLE_INIT = EventType(PN_SELECTABLE_INIT, "on_selectable_init")
+  SELECTABLE_UPDATED = EventType(PN_SELECTABLE_UPDATED, "on_selectable_updated")
   SELECTABLE_READABLE = EventType(PN_SELECTABLE_READABLE, "on_selectable_readable")
   SELECTABLE_WRITABLE = EventType(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
   SELECTABLE_EXPIRED = EventType(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
+  SELECTABLE_FINAL = EventType(PN_SELECTABLE_FINAL, "on_selectable_final")
 
   @staticmethod
   def wrap(impl):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index b70bf7e..612c38b 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -859,6 +859,9 @@ class Acceptor(Wrapper):
     def __init__(self, impl):
         Wrapper.__init__(self, impl)
 
+    def close(self):
+        pn_acceptor_close(self._impl)
+
 class Reactor(Wrapper):
 
     @staticmethod
@@ -904,9 +907,7 @@ class Reactor(Wrapper):
         return result
 
     def selectable(self):
-        impl = pn_reactor_selectable(self._impl)
-        pn_selectable_collect(impl, pn_reactor_collector(self._impl))
-        return Selectable.wrap(impl)
+        return Selectable.wrap(pn_reactor_selectable(self._impl))
 
     def update(self, sel):
         pn_reactor_update(self._impl, sel._impl)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index ae19e49..2a28dd2 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -283,9 +283,12 @@ typedef enum {
    */
   PN_TRANSPORT_CLOSED,
 
+  PN_SELECTABLE_INIT,
+  PN_SELECTABLE_UPDATED,
   PN_SELECTABLE_READABLE,
   PN_SELECTABLE_WRITABLE,
-  PN_SELECTABLE_EXPIRED
+  PN_SELECTABLE_EXPIRED,
+  PN_SELECTABLE_FINAL
 
 } pn_event_type_t;
 
@@ -315,6 +318,16 @@ PN_EXTERN pn_collector_t *pn_collector(void);
 PN_EXTERN void pn_collector_free(pn_collector_t *collector);
 
 /**
+ * Release a collector. Once in a released state a collector will
+ * drain any internally queued events (thereby releasing any pointers
+ * they may hold), shrink it's memory footprint to a minimum, and
+ * discard any newly created events.
+ *
+ * @param[in] collector a collector object
+ */
+PN_EXTERN void pn_collector_release(pn_collector_t *collector);
+
+/**
  * Place a new event on a collector.
  *
  * This operation will create a new event of the given type and

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/include/proton/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h
index 1b21195..d8eb0d1 100644
--- a/proton-c/include/proton/reactor.h
+++ b/proton-c/include/proton/reactor.h
@@ -75,7 +75,7 @@ PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor);
 PN_EXTERN pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler);
 
 
-PN_EXTERN void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor);
+PN_EXTERN void pn_acceptor_close(pn_acceptor_t *acceptor);
 
 PN_EXTERN pn_timer_t *pn_timer(pn_collector_t *collector);
 PN_EXTERN pn_timestamp_t pn_timer_deadline(pn_timer_t *timer);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/include/proton/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/selectable.h b/proton-c/include/proton/selectable.h
index 253f430..ac0416c 100644
--- a/proton-c/include/proton/selectable.h
+++ b/proton-c/include/proton/selectable.h
@@ -225,11 +225,10 @@ PN_EXTERN void pn_selectable_release(pn_selectable_t *selectable);
 PN_EXTERN void pn_selectable_free(pn_selectable_t *selectable);
 
 /**
- * Configure a selectable with a set of callbacks that emit events
- * into the provided collector.
+ * Configure a selectable with a set of callbacks that emit readable,
+ * writable, and expired events into the supplied collector.
  *
  * @param[in] selectable a selectable objet
- * @param[in] collector an event collector
  */
 PN_EXTERN void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/src/events/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c
index 0abd8c4..9b98aa1 100644
--- a/proton-c/src/events/event.c
+++ b/proton-c/src/events/event.c
@@ -28,6 +28,8 @@ static void pn_collector_initialize(pn_collector_t *collector)
 
 static void pn_collector_drain(pn_collector_t *collector)
 {
+  assert(collector);
+
   while (pn_collector_peek(collector)) {
     pn_collector_pop(collector);
   }
@@ -82,12 +84,20 @@ pn_collector_t *pn_collector(void)
 void pn_collector_free(pn_collector_t *collector)
 {
   assert(collector);
-  collector->freed = true;
-  pn_collector_drain(collector);
-  pn_collector_shrink(collector);
+  pn_collector_release(collector);
   pn_decref(collector);
 }
 
+void pn_collector_release(pn_collector_t *collector)
+{
+  assert(collector);
+  if (!collector->freed) {
+    collector->freed = true;
+    pn_collector_drain(collector);
+    pn_collector_shrink(collector);
+  }
+}
+
 pn_event_t *pn_event(void);
 
 pn_event_t *pn_collector_put(pn_collector_t *collector,
@@ -310,12 +320,18 @@ const char *pn_event_type_name(pn_event_type_t type)
     return "PN_TRANSPORT_TAIL_CLOSED";
   case PN_TRANSPORT_CLOSED:
     return "PN_TRANSPORT_CLOSED";
+  case PN_SELECTABLE_INIT:
+    return "PN_SELECTABLE_INIT";
+  case PN_SELECTABLE_UPDATED:
+    return "PN_SELECTABLE_UPDATED";
   case PN_SELECTABLE_READABLE:
     return "PN_SELECTABLE_READABLE";
   case PN_SELECTABLE_WRITABLE:
     return "PN_SELECTABLE_WRITABLE";
   case PN_SELECTABLE_EXPIRED:
     return "PN_SELECTABLE_EXPIRED";
+  case PN_SELECTABLE_FINAL:
+    return "PN_SELECTABLE_FINAL";
   }
 
   return "<unrecognized>";

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index 4be9543..2ca1b1a 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -48,7 +48,9 @@ void pni_acceptor_readable(pn_selectable_t *sel) {
 
 void pni_acceptor_finalize(pn_selectable_t *sel) {
   pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
-  pn_close(pn_reactor_io(reactor), pn_selectable_get_fd(sel));
+  if (pn_selectable_get_fd(sel) != PN_INVALID_SOCKET) {
+    pn_close(pn_reactor_io(reactor), pn_selectable_get_fd(sel));
+  }
 }
 
 pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
@@ -64,10 +66,14 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons
   return (pn_acceptor_t *) sel;
 }
 
-void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor) {
+void pn_acceptor_close(pn_acceptor_t *acceptor) {
   pn_selectable_t *sel = (pn_selectable_t *) acceptor;
-  pn_socket_t socket = pn_selectable_get_fd(sel);
-  pn_close(pn_reactor_io(reactor), socket);
-  pn_selectable_set_fd(sel, PN_INVALID_SOCKET);
-  pn_selector_remove(pn_reactor_selector(reactor), sel);
+  if (!pn_selectable_is_terminal(sel)) {
+    pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+    pn_socket_t socket = pn_selectable_get_fd(sel);
+    pn_close(pn_reactor_io(reactor), socket);
+    pn_selectable_set_fd(sel, PN_INVALID_SOCKET);
+    pn_selectable_terminate(sel);
+    pn_reactor_update(reactor, sel);
+  }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 0c30fce..78119fc 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -43,7 +43,8 @@ struct pn_reactor_t {
   pn_collector_t *collector;
   pn_handler_t *handler;
   pn_list_t *children;
-  pn_selectable_t *timer;
+  pn_timer_t *timer;
+  pn_selectable_t *selectable;
   pn_timestamp_t now;
   bool selected;
 };
@@ -62,7 +63,8 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
   reactor->collector = pn_collector();
   reactor->handler = pn_handler(pn_dummy_dispatch);
   reactor->children = pn_list(PN_OBJECT, 0);
-  reactor->timer = NULL;
+  reactor->timer = pn_timer(reactor->collector);
+  reactor->selectable = NULL;
   reactor->now = pn_i_now();
   reactor->selected = false;
 }
@@ -74,47 +76,17 @@ static void pn_reactor_finalize(pn_reactor_t *reactor) {
   pn_decref(reactor->collector);
   pn_decref(reactor->handler);
   pn_decref(reactor->children);
+  pn_decref(reactor->timer);
 }
 
 #define pn_reactor_hashcode NULL
 #define pn_reactor_compare NULL
 #define pn_reactor_inspect NULL
 
-pn_timer_t *pni_timer(pn_selectable_t *sel) {
-  pn_record_t *record = pn_selectable_attachments(sel);
-  return (pn_timer_t *) pn_record_get(record, 0x1);
-}
-
-static pn_timestamp_t pni_timer_deadline(pn_selectable_t *sel) {
-  pn_timer_t *timer = pni_timer(sel);
-  return pn_timer_deadline(timer);
-}
-
-static void pni_timer_expired(pn_selectable_t *sel) {
-  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
-  pn_timer_t *timer = pni_timer(sel);
-  pn_timer_tick(timer, reactor->now);
-}
-
-pn_selectable_t *pni_selectable_timer(pn_reactor_t *reactor) {
-  pn_selectable_t *sel = pn_reactor_selectable(reactor);
-  pn_selectable_on_expired(sel, pni_timer_expired);
-  pn_record_t *record = pn_selectable_attachments(sel);
-  pn_record_def(record, 0x1, PN_OBJECT);
-  pn_timer_t *timer = pn_timer(reactor->collector);
-  pn_record_set(record, 0x1, timer);
-  pn_decref(timer);
-  pn_selectable_set_deadline(sel, pni_timer_deadline(sel));
-  pn_reactor_update(reactor, sel);
-  return sel;
-}
-
 PN_CLASSDEF(pn_reactor)
 
 pn_reactor_t *pn_reactor() {
-  pn_reactor_t *reactor = pn_reactor_new();
-  reactor->timer = pni_selectable_timer(reactor);
-  return reactor;
+  return pn_reactor_new();
 }
 
 pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) {
@@ -124,6 +96,7 @@ pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) {
 
 void pn_reactor_free(pn_reactor_t *reactor) {
   if (reactor) {
+    pn_collector_release(reactor->collector);
     pn_handler_free(reactor->handler);
     reactor->handler = NULL;
     pn_decref(reactor);
@@ -157,14 +130,16 @@ pn_list_t *pn_reactor_children(pn_reactor_t *reactor) {
 
 static void pni_selectable_release(pn_selectable_t *selectable) {
   pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(selectable);
+  pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_FINAL);
   pn_list_remove(reactor->children, selectable);
 }
 
 pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) {
   assert(reactor);
   pn_selectable_t *sel = pn_selectable();
+  pn_selectable_collect(sel, reactor->collector);
+  pn_collector_put(reactor->collector, PN_OBJECT, sel, PN_SELECTABLE_INIT);
   pni_selectable_set_context(sel, reactor);
-  pn_selector_add(reactor->selector, sel);
   pn_list_add(reactor->children, sel);
   pn_selectable_on_release(sel, pni_selectable_release);
   pn_decref(sel);
@@ -173,7 +148,7 @@ pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) {
 
 void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) {
   assert(reactor);
-  pn_selector_update(reactor->selector, selectable);
+  pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED);
 }
 
 void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
@@ -196,6 +171,23 @@ static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event)
   assert(reactor);
   assert(event);
   switch (pn_event_type(event)) {
+  case PN_SELECTABLE_INIT:
+    {
+      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+      pn_selector_add(reactor->selector, sel);
+    }
+    break;
+  case PN_SELECTABLE_UPDATED:
+    {
+      pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
+      if (pn_selectable_is_terminal(sel)) {
+        pn_selector_remove(reactor->selector, sel);
+        pn_selectable_release(sel);
+      } else {
+        pn_selector_update(reactor->selector, sel);
+      }
+    }
+    break;
   case PN_TRANSPORT:
     pni_handle_transport(reactor, event);
     break;
@@ -222,8 +214,8 @@ void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler) {
   pn_record_set(record, PN_HANDLER, handler);
 }
 
-static void *pni_reactor = NULL;
-#define PN_REACTOR ((pn_handle_t) &pni_reactor)
+static void *pni_reactor_handle = NULL;
+#define PN_REACTOR ((pn_handle_t) &pni_reactor_handle)
 
 pn_reactor_t *pni_record_get_reactor(pn_record_t *record) {
   return (pn_reactor_t *) pn_record_get(record, PN_REACTOR);
@@ -251,6 +243,10 @@ static pn_connection_t *pni_object_connection(const pn_class_t *clazz, void *obj
   }
 }
 
+static pn_reactor_t *pni_reactor(pn_selectable_t *sel) {
+  return (pn_reactor_t *) pni_selectable_get_context(sel);
+}
+
 pn_reactor_t *pn_event_reactor(pn_event_t *event) {
   const pn_class_t *clazz = pn_event_class(event);
   void *context = pn_event_context(event);
@@ -273,7 +269,7 @@ pn_reactor_t *pn_event_reactor(pn_event_t *event) {
   case CID_pn_selectable:
     {
       pn_selectable_t *sel = (pn_selectable_t *) pn_event_context(event);
-      return (pn_reactor_t *) pni_selectable_get_context(sel);
+      return pni_reactor(sel);
     }
   default:
     return NULL;
@@ -305,13 +301,14 @@ pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler)
 }
 
 pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) {
-  pn_timer_t *timer = pni_timer(reactor->timer);
-  pn_task_t *task = pn_timer_schedule(timer, reactor->now + delay);
+  pn_task_t *task = pn_timer_schedule(reactor->timer, reactor->now + delay);
   pn_record_t *record = pn_task_attachments(task);
   pni_record_init_reactor(record, reactor);
   pni_record_init_handler(record, handler);
-  pn_selectable_set_deadline(reactor->timer, pni_timer_deadline(reactor->timer));
-  pn_reactor_update(reactor, reactor->timer);
+  if (reactor->selectable) {
+    pn_selectable_set_deadline(reactor->selectable, pn_timer_deadline(reactor->timer));
+    pn_reactor_update(reactor, reactor->selectable);
+  }
   return task;
 }
 
@@ -327,10 +324,24 @@ void pn_reactor_process(pn_reactor_t *reactor) {
   }
 }
 
+static void pni_timer_expired(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = pni_reactor(sel);
+  pn_timer_tick(reactor->timer, reactor->now);
+}
+
+pn_selectable_t *pni_timer_selectable(pn_reactor_t *reactor) {
+  pn_selectable_t *sel = pn_reactor_selectable(reactor);
+  pn_selectable_on_expired(sel, pni_timer_expired);
+  pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer));
+  pn_reactor_update(reactor, sel);
+  return sel;
+}
+
 void pn_reactor_start(pn_reactor_t *reactor) {
   assert(reactor);
   pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT);
-}
+  reactor->selectable = pni_timer_selectable(reactor);
+ }
 
 bool pn_reactor_work(pn_reactor_t *reactor, int timeout) {
   assert(reactor);
@@ -339,15 +350,21 @@ bool pn_reactor_work(pn_reactor_t *reactor, int timeout) {
 
   if (pn_selector_size(reactor->selector) == 1) {
     if (reactor->selected) {
-      pn_timer_t *timer = pni_timer(reactor->timer);
-      if (!pn_timer_tasks(timer)) {
-        return false;
+      if (!pn_timer_tasks(reactor->timer) && reactor->selectable) {
+        pn_selectable_terminate(reactor->selectable);
+        pn_reactor_update(reactor, reactor->selectable);
+        reactor->selectable = NULL;
+        return true;
       }
     } else {
       timeout = 0;
     }
   }
 
+  if (!pn_selector_size(reactor->selector)) {
+    return false;
+  }
+
   pn_selector_select(reactor->selector, timeout);
   pn_selectable_t *sel;
   int events;
@@ -362,10 +379,6 @@ bool pn_reactor_work(pn_reactor_t *reactor, int timeout) {
     if (events & PN_EXPIRED) {
       pn_selectable_expired(sel);
     }
-    if (pn_selectable_is_terminal(sel)) {
-      pn_selector_remove(reactor->selector, sel);
-      pn_selectable_release(sel);
-    }
   }
 
   reactor->selected = true;
@@ -377,6 +390,7 @@ void pn_reactor_stop(pn_reactor_t *reactor) {
   assert(reactor);
   pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL);
   pn_reactor_process(reactor);
+  pn_collector_release(reactor->collector);
 }
 
 void pn_reactor_run(pn_reactor_t *reactor) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/src/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c
index ec3c193..94856ab 100644
--- a/proton-c/src/selectable.c
+++ b/proton-c/src/selectable.c
@@ -81,6 +81,7 @@ void pn_selectable_finalize(pn_selectable_t *sel)
     sel->finalize(sel);
   }
   pn_decref(sel->attachments);
+  pn_decref(sel->collector);
 }
 
 #define pn_selectable_hashcode NULL
@@ -267,6 +268,7 @@ void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collecto
   pn_decref(selectable->collector);
   selectable->collector = collector;
   pn_incref(selectable->collector);
+
   if (collector) {
     pn_selectable_on_readable(selectable, pni_readable);
     pn_selectable_on_writable(selectable, pni_writable);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index ce8c5cf..dc61274 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -37,6 +37,12 @@ static void test_reactor(void) {
   pn_free(reactor);
 }
 
+static void test_reactor_free(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  pn_reactor_free(reactor);
+}
+
 static void test_reactor_run(void) {
   pn_reactor_t *reactor = pn_reactor();
   assert(reactor);
@@ -45,6 +51,14 @@ static void test_reactor_run(void) {
   pn_free(reactor);
 }
 
+static void test_reactor_run_free(void) {
+  pn_reactor_t *reactor = pn_reactor();
+  assert(reactor);
+  // run should exit if there is nothing left to do
+  pn_reactor_run(reactor);
+  pn_reactor_free(reactor);
+}
+
 typedef struct {
   pn_reactor_t *reactor;
   pn_list_t *events;
@@ -123,7 +137,8 @@ static void test_reactor_handler_run(void) {
   pn_handler_t *th = test_handler(reactor, events);
   pn_handler_add(handler, th);
   pn_reactor_run(reactor);
-  expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   pn_free(reactor);
   pn_free(th);
   pn_free(events);
@@ -137,7 +152,8 @@ static void test_reactor_handler_run_free(void) {
   pn_list_t *events = pn_list(PN_VOID, 0);
   pn_handler_add(handler, test_handler(reactor, events));
   pn_reactor_run(reactor);
-  expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   pn_reactor_free(reactor);
   pn_free(events);
 }
@@ -153,7 +169,8 @@ static void test_reactor_connection(void) {
   pn_list_t *revents = pn_list(PN_VOID, 0);
   pn_handler_add(root, test_handler(reactor, revents));
   pn_reactor_run(reactor);
-  expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  expect(revents, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   expect(cevents, PN_CONNECTION_INIT, END);
   pn_reactor_free(reactor);
   pn_handler_free(tch);
@@ -178,8 +195,7 @@ static void tra_dispatch(pn_handler_t *handler, pn_event_t *event) {
   case PN_REACTOR_INIT:
     {
       pn_acceptor_t *acceptor = *tram(handler);
-      pn_reactor_t *reactor = (pn_reactor_t *) pn_event_context(event);
-      pn_acceptor_close(reactor, acceptor);
+      pn_acceptor_close(acceptor);
     }
     break;
   default:
@@ -223,7 +239,7 @@ static void server_dispatch(pn_handler_t *handler, pn_event_t *event) {
     pn_connection_open(pn_event_connection(event));
     break;
   case PN_CONNECTION_REMOTE_CLOSE:
-    pn_acceptor_close(srv->reactor, srv->acceptor);
+    pn_acceptor_close(srv->acceptor);
     pn_connection_close(pn_event_connection(event));
     pn_connection_release(pn_event_connection(event));
     break;
@@ -268,7 +284,6 @@ static void test_reactor_connect(void) {
   srv->reactor = reactor;
   srv->acceptor = acceptor;
   srv->events = pn_list(PN_VOID, 0);
-  pn_decref(sh);
   pn_handler_t *ch = pn_handler_new(client_dispatch, sizeof(client_t), NULL);
   client_t *cli = cmem(ch);
   cli->events = pn_list(PN_VOID, 0);
@@ -282,6 +297,7 @@ static void test_reactor_connect(void) {
          PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED,
          PN_CONNECTION_UNBOUND, PN_CONNECTION_FINAL, END);
   pn_free(srv->events);
+  pn_decref(sh);
   expect(cli->events, PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN,
          PN_CONNECTION_BOUND, PN_TRANSPORT, PN_TRANSPORT,
          PN_CONNECTION_REMOTE_OPEN, PN_CONNECTION_LOCAL_CLOSE,
@@ -403,7 +419,8 @@ static void test_reactor_schedule(void) {
   pn_reactor_schedule(reactor, 0, NULL);
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);
-  expect(events, PN_REACTOR_INIT, PN_TIMER_TASK, PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_TIMER_TASK,
+         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   pn_free(events);
 }
 
@@ -418,7 +435,8 @@ static void test_reactor_schedule_handler(void) {
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);
   pn_handler_free(th);
-  expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   expect(tevents, PN_TIMER_TASK, END);
   pn_free(events);
   pn_free(tevents);
@@ -427,7 +445,9 @@ static void test_reactor_schedule_handler(void) {
 int main(int argc, char **argv)
 {
   test_reactor();
+  test_reactor_free();
   test_reactor_run();
+  test_reactor_run_free();
   test_reactor_handler();
   test_reactor_handler_free();
   test_reactor_handler_run();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
index 1970085..ed3a421 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
@@ -69,9 +69,12 @@ public interface Event
         TRANSPORT_TAIL_CLOSED,
         TRANSPORT_CLOSED,
 
+        SELECTABLE_INIT,
+        SELECTABLE_UPDATED,
         SELECTABLE_READABLE,
         SELECTABLE_WRITABLE,
-        SELECTABLE_EXPIRED
+        SELECTABLE_EXPIRED,
+        SELECTABLE_FINAL
     }
 
     Type getType();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4283872a/proton-j/src/main/resources/cengine.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py
index 5c0de79..75624c9 100644
--- a/proton-j/src/main/resources/cengine.py
+++ b/proton-j/src/main/resources/cengine.py
@@ -984,9 +984,12 @@ PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR
 PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED
 PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED
 PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED
+PN_SELECTABLE_INIT = Event.Type.SELECTABLE_INIT
+PN_SELECTABLE_UPDATED = Event.Type.SELECTABLE_UPDATED
 PN_SELECTABLE_READABLE = Event.Type.SELECTABLE_READABLE
 PN_SELECTABLE_WRITABLE = Event.Type.SELECTABLE_WRITABLE
 PN_SELECTABLE_EXPIRED = Event.Type.SELECTABLE_EXPIRED
+PN_SELECTABLE_FINAL = Event.Type.SELECTABLE_FINAL
 
 def pn_collector():
   return Proton.collector()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org