You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/17 02:56:53 UTC

qpid-proton git commit: added pn_selectable_collect and modified callbacks to allow for embedding languages to define their own selectables

Repository: qpid-proton
Updated Branches:
  refs/heads/master 6982026d7 -> c69cfe64f


added pn_selectable_collect and modified callbacks to allow for embedding languages to define their own selectables


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c69cfe64
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c69cfe64
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c69cfe64

Branch: refs/heads/master
Commit: c69cfe64f8f7190c973da297eb7761de4e9a1eb5
Parents: 6982026
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Fri Jan 16 20:41:26 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Fri Jan 16 20:46:07 2015 -0500

----------------------------------------------------------------------
 proton-c/bindings/javascript/CMakeLists.txt |   2 +-
 proton-c/bindings/javascript/module.js      |   6 +-
 proton-c/bindings/python/proton/__init__.py |  44 +++++--
 proton-c/bindings/python/setup.py           |  30 +++++
 proton-c/include/proton/event.h             |   6 +-
 proton-c/include/proton/selectable.h        |  49 ++++----
 proton-c/src/events/event.c                 |   6 +
 proton-c/src/messenger/messenger.c          | 101 +++++-----------
 proton-c/src/posix/selector.c               |   6 +-
 proton-c/src/reactor/acceptor.c             |  10 +-
 proton-c/src/reactor/connection.c           |  91 +++++++-------
 proton-c/src/reactor/reactor.c              |   5 +-
 proton-c/src/selectable.c                   | 147 +++++++++++------------
 proton-c/src/selectable.h                   |   7 --
 tests/python/proton_tests/messenger.py      |   4 +-
 15 files changed, 262 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/javascript/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt
index 288201f..d10212c 100644
--- a/proton-c/bindings/javascript/CMakeLists.txt
+++ b/proton-c/bindings/javascript/CMakeLists.txt
@@ -219,7 +219,7 @@ set_target_properties(
   # fiddly with node.js packages. This behaviour might be reinstated if the
   # packaging mechanism improves.
 
-  LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
 pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming',  '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_su
 bscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_p
 n_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_p
 n_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_capacity', '_pn_selectable_writable', '_pn_selectable_pending', '_pn_selectable_is_terminal'
 , '_pn_selectable_fd', '_pn_selectable_free']\""
+  LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
 pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming',  '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_su
 bscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_p
 n_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_p
 n_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_is_reading', '_pn_selectable_writable', '_pn_selectable_is_writing', '_pn_selectable_is_term
 inal', '_pn_selectable_get_fd', '_pn_selectable_free']\""
   )
 
 # This command packages up the compiled proton.js into a node.js package called

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/javascript/module.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/module.js b/proton-c/bindings/javascript/module.js
index 59d2517..3603de3 100644
--- a/proton-c/bindings/javascript/module.js
+++ b/proton-c/bindings/javascript/module.js
@@ -221,7 +221,7 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl
         for (var name in _messengers) {
             var messenger = _messengers[name];
             while ((sel = _pn_messenger_selectable(messenger._messenger))) {
-                fd = _pn_selectable_fd(sel);
+                fd = _pn_selectable_get_fd(sel);
                 // Only register valid selectables, otherwise free them.
                 if (fd === -1) {
                     _pn_selectable_free(sel);
@@ -268,8 +268,8 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl
                         if (sock.sock_ops.poll) {
                             var mask = sock.sock_ops.poll(sock); // Low-level poll call.
                             if (mask) {
-                                var capacity = _pn_selectable_capacity(sel) > 0;
-                                var pending = _pn_selectable_pending(sel) > 0;
+                                var capacity = _pn_selectable_is_reading(sel);
+                                var pending = _pn_selectable_is_writing(sel);
 
                                 if ((mask & POLLIN) && capacity) {
 //console.log("- readable fd = " + fd + ", capacity = " + _pn_selectable_capacity(sel));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 6e0a44d..98cb9d7 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -1156,35 +1156,57 @@ class Subscription(object):
   def address(self):
     return pn_subscription_address(self._impl)
 
+_DEFAULT = object()
+
 class Selectable(object):
 
   def __init__(self, messenger, impl):
     self.messenger = messenger
     self._impl = impl
 
-  def fileno(self):
+  def fileno(self, fd = _DEFAULT):
     if not self._impl: raise ValueError("selectable freed")
-    return pn_selectable_get_fd(self._impl)
+    if fd is _DEFAULT:
+      return pn_selectable_get_fd(self._impl)
+    elif fd is None:
+      pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
+    else:
+      pn_selectable_set_fd(self._impl, fd)
 
-  @property
-  def capacity(self):
+  def _is_reading(self):
     if not self._impl: raise ValueError("selectable freed")
-    return pn_selectable_capacity(self._impl)
+    return pn_selectable_is_reading(self._impl)
 
-  @property
-  def pending(self):
+  def _set_reading(self, val):
     if not self._impl: raise ValueError("selectable freed")
-    return pn_selectable_pending(self._impl)
+    pn_selectable_set_reading(self._impl, bool(val))
 
-  @property
-  def deadline(self):
+  reading = property(_is_reading, _set_reading)
+
+  def _is_writing(self):
     if not self._impl: raise ValueError("selectable freed")
-    tstamp = pn_selectable_deadline(self._impl)
+    return pn_selectable_is_writing(self._impl)
+
+  def _set_writing(self, val):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_set_writing(self._impl, bool(val))
+
+  writing = property(_is_writing, _set_writing)
+
+  def _get_deadline(self):
+    if not self._impl: raise ValueError("selectable freed")
+    tstamp = pn_selectable_get_deadline(self._impl)
     if tstamp:
       return millis2secs(tstamp)
     else:
       return None
 
+  def _set_deadline(self, deadline):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_set_deadline(self._impl, secs2millis(deadline))
+
+  deadline = property(_get_deadline, _set_deadline)
+
   def readable(self):
     if not self._impl: raise ValueError("selectable freed")
     pn_selectable_readable(self._impl)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/python/setup.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/setup.py b/proton-c/bindings/python/setup.py
new file mode 100755
index 0000000..9765342
--- /dev/null
+++ b/proton-c/bindings/python/setup.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from distutils.core import setup, Extension
+
+setup(name='Proton',
+      version='0.9',
+      description='An AMQP based messaging library.',
+      author='Apache Qpid',
+      author_email='proton@qpid.apache.org',
+      url='http://qpid.apache.org/proton/',
+      packages=['proton'],
+      ext_modules=[Extension('cproton', ['cproton.i'])])

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 34d60c3..ae19e49 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -281,7 +281,11 @@ typedef enum {
    * Indicates that the both the head and tail of the transport are
    * closed. Events of this type point to the relevant transport.
    */
-  PN_TRANSPORT_CLOSED
+  PN_TRANSPORT_CLOSED,
+
+  PN_SELECTABLE_READABLE,
+  PN_SELECTABLE_WRITABLE,
+  PN_SELECTABLE_EXPIRED
 
 } pn_event_type_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/include/proton/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/selectable.h b/proton-c/include/proton/selectable.h
index 884efd2..615b72b 100644
--- a/proton-c/include/proton/selectable.h
+++ b/proton-c/include/proton/selectable.h
@@ -24,6 +24,7 @@
 
 #include <proton/import_export.h>
 #include <proton/object.h>
+#include <proton/event.h>
 #include <proton/io.h>
 #include <proton/type_compat.h>
 
@@ -92,13 +93,10 @@ PN_EXTERN void pn_selectables_free(pn_selectables_t *selectables);
 
 PN_EXTERN pn_selectable_t *pn_selectable(void);
 
-PN_EXTERN void pn_selectable_set_capacity(pn_selectable_t *sel, ssize_t (*capacity)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_pending(pn_selectable_t *sel, ssize_t (*pending)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t (*deadline)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *));
 
 PN_EXTERN pn_record_t *pn_selectable_attachments(pn_selectable_t *sel);
 
@@ -119,28 +117,24 @@ PN_EXTERN pn_socket_t pn_selectable_get_fd(pn_selectable_t *selectable);
 PN_EXTERN void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd);
 
 /**
- * Get the capacity of a selectable.
- *
- * A selectable with a positive capacity is interested in being
- * notified of read events. A negative capacity indicates that the
- * selectable will never be interested in read events ever again.
+ * Check if a selectable is interested in readable events.
  *
  * @param[in] selectable a selectable object
- * @return the selectables capacity
+ * @return true iff the selectable is interested in read events
  */
-PN_EXTERN ssize_t pn_selectable_capacity(pn_selectable_t *selectable);
+PN_EXTERN bool pn_selectable_is_reading(pn_selectable_t *selectable);
+
+PN_EXTERN void pn_selectable_set_reading(pn_selectable_t *sel, bool reading);
 
 /**
- * Get the number of bytes pending for a selectable.
- *
- * A selectable with pending bytes is interested in being notified of
- * write events. If this value is negative then the selectable will
- * never be interested in write events ever again.
+ * Check if a selectable is interested in writable events.
  *
  * @param[in] selectable a selectable object
- * @return the number of bytes pending for the selectable
+ * @return true iff the selectable is interested in writable events
  */
-PN_EXTERN ssize_t pn_selectable_pending(pn_selectable_t *selectable);
+PN_EXTERN bool pn_selectable_is_writing(pn_selectable_t *selectable);
+
+  PN_EXTERN void pn_selectable_set_writing(pn_selectable_t *sel, bool writing);
 
 /**
  * Get the next deadline for a selectable.
@@ -152,7 +146,9 @@ PN_EXTERN ssize_t pn_selectable_pending(pn_selectable_t *selectable);
  * @param[in] selectable a selectable object
  * @return the next deadline or zero
  */
-PN_EXTERN pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable);
+PN_EXTERN pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *selectable);
+
+PN_EXTERN void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline);
 
 /**
  * Notify a selectable that the file descriptor is readable.
@@ -226,6 +222,15 @@ PN_EXTERN void pn_selectable_terminate(pn_selectable_t *selectable);
 PN_EXTERN void pn_selectable_free(pn_selectable_t *selectable);
 
 /**
+ * Configure a selectable with a set of callbacks that emit events
+ * into the provided collector.
+ *
+ * @param[in] selectable a selectable objet
+ * @param[in] collector an event collector
+ */
+PN_EXTERN void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector);
+
+/**
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/events/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c
index 1670704..0abd8c4 100644
--- a/proton-c/src/events/event.c
+++ b/proton-c/src/events/event.c
@@ -310,6 +310,12 @@ const char *pn_event_type_name(pn_event_type_t type)
     return "PN_TRANSPORT_TAIL_CLOSED";
   case PN_TRANSPORT_CLOSED:
     return "PN_TRANSPORT_CLOSED";
+  case PN_SELECTABLE_READABLE:
+    return "PN_SELECTABLE_READABLE";
+  case PN_SELECTABLE_WRITABLE:
+    return "PN_SELECTABLE_WRITABLE";
+  case PN_SELECTABLE_EXPIRED:
+    return "PN_SELECTABLE_EXPIRED";
   }
 
   return "<unrecognized>";

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c
index b46ca69..ec3fd3d 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -192,6 +192,17 @@ static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
   return ctx->messenger->next_drain;
 }
 
+static void pni_connection_update(pn_selectable_t *sel) {
+  ssize_t c = pni_connection_capacity(sel);
+  pn_selectable_set_reading(sel, c > 0);
+  ssize_t p = pni_connection_pending(sel);
+  pn_selectable_set_writing(sel, p > 0);
+  pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+  if (c < 0 && p < 0) {
+    pn_selectable_terminate(sel);
+  }
+}
+
 #include <errno.h>
 
 static void pn_error_report(const char *pfx, const char *error)
@@ -211,6 +222,7 @@ void pni_modified(pn_ctx_t *ctx)
 
 void pni_conn_modified(pn_connection_ctx_t *ctx)
 {
+  pni_connection_update(ctx->selectable);
   pni_modified((pn_ctx_t *) ctx);
 }
 
@@ -296,21 +308,6 @@ static void pni_connection_finalize(pn_selectable_t *sel)
   pni_messenger_reclaim(ctx->messenger, ctx->connection);
 }
 
-static ssize_t pni_listener_capacity(pn_selectable_t *sel)
-{
-  return 1;
-}
-
-static ssize_t pni_listener_pending(pn_selectable_t *sel)
-{
-  return 0;
-}
-
-static pn_timestamp_t pni_listener_deadline(pn_selectable_t *sel)
-{
-  return 0;
-}
-
 pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
                                          pn_socket_t sock,
                                          const char *scheme,
@@ -340,16 +337,7 @@ static void pni_listener_readable(pn_selectable_t *sel)
 
   pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, scheme, NULL, NULL, NULL, NULL, ctx);
   pn_transport_bind(t, conn);
-}
-
-static void pni_listener_writable(pn_selectable_t *sel)
-{
-  // do nothing
-}
-
-static void pni_listener_expired(pn_selectable_t *sel)
-{
-  // do nothing
+  pni_conn_modified((pn_connection_ctx_t *) pn_connection_get_context(conn));
 }
 
 static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t *ctx);
@@ -417,13 +405,10 @@ static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger,
   ctx->host = pn_strdup(host);
   ctx->port = pn_strdup(port);
 
-  pn_selectable_t *selectable = pni_selectable(pni_listener_capacity,
-                                               pni_listener_pending,
-                                               pni_listener_deadline,
-                                               pni_listener_readable,
-                                               pni_listener_writable,
-                                               pni_listener_expired,
-                                               pni_listener_finalize);
+  pn_selectable_t *selectable = pn_selectable();
+  pn_selectable_set_reading(selectable, true);
+  pn_selectable_on_readable(selectable, pni_listener_readable);
+  pn_selectable_on_finalize(selectable, pni_listener_finalize);
   pn_selectable_set_fd(selectable, socket);
   pni_selectable_set_context(selectable, ctx);
   pn_list_add(messenger->pending, selectable);
@@ -459,13 +444,12 @@ static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
   ctx = (pn_connection_ctx_t *) malloc(sizeof(pn_connection_ctx_t));
   ctx->messenger = messenger;
   ctx->connection = conn;
-  ctx->selectable = pni_selectable(pni_connection_capacity,
-                                   pni_connection_pending,
-                                   pni_connection_deadline,
-                                   pni_connection_readable,
-                                   pni_connection_writable,
-                                   pni_connection_expired,
-                                   pni_connection_finalize);
+  pn_selectable_t *sel = pn_selectable();
+  ctx->selectable = sel;
+  pn_selectable_on_readable(sel, pni_connection_readable);
+  pn_selectable_on_writable(sel, pni_connection_writable);
+  pn_selectable_on_expired(sel, pni_connection_expired);
+  pn_selectable_on_finalize(sel, pni_connection_finalize);
   pn_selectable_set_fd(ctx->selectable, sock);
   pni_selectable_set_context(ctx->selectable, ctx);
   pn_list_add(messenger->pending, ctx->selectable);
@@ -477,7 +461,6 @@ static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
   ctx->port = pn_strdup(port);
   ctx->listener = lnr;
   pn_connection_set_context(conn, ctx);
-
   return ctx;
 }
 
@@ -559,21 +542,6 @@ static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
   }
 }
 
-static ssize_t pni_interruptor_capacity(pn_selectable_t *sel)
-{
-  return 1024;
-}
-
-static ssize_t pni_interruptor_pending(pn_selectable_t *sel)
-{
-  return 0;
-}
-
-static pn_timestamp_t pni_interruptor_deadline(pn_selectable_t *sel)
-{
-  return 0;
-}
-
 static void pni_interruptor_readable(pn_selectable_t *sel)
 {
   pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
@@ -582,16 +550,6 @@ static void pni_interruptor_readable(pn_selectable_t *sel)
   messenger->interrupted = true;
 }
 
-static void pni_interruptor_writable(pn_selectable_t *sel)
-{
-  // do nothing
-}
-
-static void pni_interruptor_expired(pn_selectable_t *sel)
-{
-  // do nothing
-}
-
 static void pni_interruptor_finalize(pn_selectable_t *sel)
 {
   pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
@@ -613,11 +571,10 @@ pn_messenger_t *pn_messenger(const char *name)
     m->passive = false;
     m->io = pn_io();
     m->pending = pn_list(PN_WEAKREF, 0);
-    m->interruptor = pni_selectable
-      (pni_interruptor_capacity, pni_interruptor_pending,
-       pni_interruptor_deadline, pni_interruptor_readable,
-       pni_interruptor_writable, pni_interruptor_expired,
-       pni_interruptor_finalize);
+    m->interruptor = pn_selectable();
+    pn_selectable_set_reading(m->interruptor, true);
+    pn_selectable_on_readable(m->interruptor, pni_interruptor_readable);
+    pn_selectable_on_finalize(m->interruptor, pni_interruptor_finalize);
     pn_list_add(m->pending, m->interruptor);
     m->interrupted = false;
     // Explicitly initialise pipe file descriptors to invalid values in case pipe
@@ -1659,10 +1616,10 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add
     pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, NULL);
   pn_transport_t *transport = pn_transport();
   pn_transport_bind(transport, connection);
+  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
+  pn_selectable_t *sel = ctx->selectable;
   err = pn_transport_config(messenger, connection);
   if (err) {
-    pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
-    pn_selectable_t *sel = ctx->selectable;
     pn_selectable_free(sel);
     messenger->connection_error = err;
     return NULL;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/posix/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/posix/selector.c b/proton-c/src/posix/selector.c
index 0fd8918..fa38d1c 100644
--- a/proton-c/src/posix/selector.c
+++ b/proton-c/src/posix/selector.c
@@ -99,13 +99,13 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
   selector->fds[idx].fd = pn_selectable_get_fd(selectable);
   selector->fds[idx].events = 0;
   selector->fds[idx].revents = 0;
-  if (pn_selectable_capacity(selectable) > 0) {
+  if (pn_selectable_is_reading(selectable)) {
     selector->fds[idx].events |= POLLIN;
   }
-  if (pn_selectable_pending(selectable) > 0) {
+  if (pn_selectable_is_writing(selectable)) {
     selector->fds[idx].events |= POLLOUT;
   }
-  selector->deadlines[idx] = pn_selectable_deadline(selectable);
+  selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
 }
 
 void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index 8942109..a9b7dab 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -26,10 +26,6 @@
 #include "reactor.h"
 #include "selectable.h"
 
-static ssize_t pni_acceptor_capacity(pn_selectable_t *sel) {
-  return 1;
-}
-
 pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
 
 void pni_acceptor_readable(pn_selectable_t *sel) {
@@ -57,14 +53,14 @@ void pni_acceptor_finalize(pn_selectable_t *sel) {
 
 pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
   pn_selectable_t *sel = pn_reactor_selectable(reactor);
-  pn_selectable_set_capacity(sel, pni_acceptor_capacity);
-  pn_selectable_set_readable(sel, pni_acceptor_readable);
-  pn_selectable_set_finalize(sel, pni_acceptor_finalize);
   pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port);
   pn_selectable_set_fd(sel, socket);
+  pn_selectable_on_readable(sel, pni_acceptor_readable);
+  pn_selectable_on_finalize(sel, pni_acceptor_finalize);
   pni_selectable_set_context(sel, reactor);
   pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
   pni_record_init_handler(pn_selectable_attachments(sel), handler);
+  pn_selectable_set_reading(sel, true);
   pn_reactor_update(reactor, sel);
   return (pn_acceptor_t *) sel;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
index 73bd4ff..0987e41 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -33,12 +33,55 @@
 static void *pni_transportctx = NULL;
 #define PN_TRANCTX ((pn_handle_t) &pni_transportctx)
 
+static pn_transport_t *pni_transport(pn_selectable_t *sel) {
+  pn_record_t *record = pn_selectable_attachments(sel);
+  return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) {
+    if (pn_transport_closed(transport)) {
+      pn_selectable_terminate(sel);
+    }
+  }
+  return capacity;
+}
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending < 0) {
+    if (pn_transport_closed(transport)) {
+      pn_selectable_terminate(sel);
+    }
+  }
+  return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static void pni_connection_update(pn_selectable_t *sel) {
+  ssize_t c = pni_connection_capacity(sel);
+  ssize_t p = pni_connection_pending(sel);
+  pn_selectable_set_reading(sel, c > 0);
+  pn_selectable_set_writing(sel, p > 0);
+  pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+}
+
 void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) {
   assert(reactor);
   pn_transport_t *transport = pn_event_transport(event);
   pn_record_t *record = pn_transport_attachments(transport);
   pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX);
   if (sel && !pn_selectable_is_terminal(sel)) {
+    pni_connection_update(sel);
     pn_reactor_update(reactor, sel);
   }
 }
@@ -80,40 +123,6 @@ void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {
   pn_list_remove(pn_reactor_children(reactor), conn);
 }
 
-static pn_transport_t *pni_transport(pn_selectable_t *sel) {
-  pn_record_t *record = pn_selectable_attachments(sel);
-  return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
-}
-
-static ssize_t pni_connection_capacity(pn_selectable_t *sel)
-{
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t capacity = pn_transport_capacity(transport);
-  if (capacity < 0) {
-    if (pn_transport_closed(transport)) {
-      pn_selectable_terminate(sel);
-    }
-  }
-  return capacity;
-}
-
-static ssize_t pni_connection_pending(pn_selectable_t *sel)
-{
-  pn_transport_t *transport = pni_transport(sel);
-  ssize_t pending = pn_transport_pending(transport);
-  if (pending < 0) {
-    if (pn_transport_closed(transport)) {
-      pn_selectable_terminate(sel);
-    }
-  }
-  return pending;
-}
-
-static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
-{
-  return 0;
-}
-
 static void pni_connection_readable(pn_selectable_t *sel)
 {
   pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
@@ -134,6 +143,7 @@ static void pni_connection_readable(pn_selectable_t *sel)
 
   ssize_t newcap = pn_transport_capacity(transport);
   if (newcap != capacity) {
+    pni_connection_update(sel);
     pn_reactor_update(reactor, sel);
   }
 }
@@ -158,6 +168,7 @@ static void pni_connection_writable(pn_selectable_t *sel)
 
   ssize_t newpending = pn_transport_pending(transport);
   if (newpending != pending) {
+    pni_connection_update(sel);
     pn_reactor_update(reactor, sel);
   }
 }
@@ -175,14 +186,11 @@ static void pni_connection_finalize(pn_selectable_t *sel) {
 
 pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) {
   pn_selectable_t *sel = pn_reactor_selectable(reactor);
-  pn_selectable_set_capacity(sel, pni_connection_capacity);
-  pn_selectable_set_pending(sel, pni_connection_pending);
-  pn_selectable_set_deadline(sel, pni_connection_deadline);
-  pn_selectable_set_readable(sel, pni_connection_readable);
-  pn_selectable_set_writable(sel, pni_connection_writable);
-  pn_selectable_set_expired(sel, pni_connection_expired);
-  pn_selectable_set_finalize(sel, pni_connection_finalize);
   pn_selectable_set_fd(sel, sock);
+  pn_selectable_on_readable(sel, pni_connection_readable);
+  pn_selectable_on_writable(sel, pni_connection_writable);
+  pn_selectable_on_expired(sel, pni_connection_expired);
+  pn_selectable_on_finalize(sel, pni_connection_finalize);
   pni_selectable_set_context(sel, reactor);
   pn_record_t *record = pn_selectable_attachments(sel);
   pn_record_def(record, PN_TRANCTX, PN_OBJECT);
@@ -191,6 +199,7 @@ pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socke
   pn_record_def(tr, PN_TRANCTX, PN_WEAKREF);
   pn_record_set(tr, PN_TRANCTX, sel);
   pni_record_init_reactor(tr, reactor);
+  pni_connection_update(sel);
   pn_reactor_update(reactor, sel);
   return sel;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index b17473c..c3c5098 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -98,14 +98,14 @@ static void pni_timer_expired(pn_selectable_t *sel) {
 
 pn_selectable_t *pni_selectable_timer(pn_reactor_t *reactor) {
   pn_selectable_t *sel = pn_reactor_selectable(reactor);
-  pn_selectable_set_deadline(sel, pni_timer_deadline);
-  pn_selectable_set_expired(sel, pni_timer_expired);
+  pn_selectable_on_expired(sel, pni_timer_expired);
   pni_selectable_set_context(sel, reactor);
   pn_record_t *record = pn_selectable_attachments(sel);
   pn_record_def(record, 0x1, PN_OBJECT);
   pn_timer_t *timer = pn_timer(reactor->collector);
   pn_record_set(record, 0x1, timer);
   pn_decref(timer);
+  pn_selectable_set_deadline(sel, pni_timer_deadline(sel));
   pn_reactor_update(reactor, sel);
   return sel;
 }
@@ -299,6 +299,7 @@ pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *h
   pn_record_t *record = pn_task_attachments(task);
   pni_record_init_reactor(record, reactor);
   pni_record_init_handler(record, handler);
+  pn_selectable_set_deadline(reactor->timer, pni_timer_deadline(reactor->timer));
   pn_reactor_update(reactor, reactor->timer);
   return task;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c
index 35d9637..1f295ff 100644
--- a/proton-c/src/selectable.c
+++ b/proton-c/src/selectable.c
@@ -43,122 +43,119 @@ void pn_selectables_free(pn_selectables_t *selectables)
 struct pn_selectable_t {
   pn_socket_t fd;
   int index;
-  pn_record_t *context;
-  ssize_t (*capacity)(pn_selectable_t *);
-  ssize_t (*pending)(pn_selectable_t *);
-  pn_timestamp_t (*deadline)(pn_selectable_t *);
+  pn_record_t *attachments;
   void (*readable)(pn_selectable_t *);
   void (*writable)(pn_selectable_t *);
   void (*expired)(pn_selectable_t *);
   void (*finalize)(pn_selectable_t *);
+  pn_collector_t *collector;
+  pn_timestamp_t deadline;
+  bool reading;
+  bool writing;
   bool registered;
   bool terminal;
 };
 
-void pn_selectable_initialize(void *obj)
+void pn_selectable_initialize(pn_selectable_t *sel)
 {
-  pn_selectable_t *sel = (pn_selectable_t *) obj;
   sel->fd = PN_INVALID_SOCKET;
   sel->index = -1;
-  sel->context = pn_record();
-  sel->capacity = NULL;
-  sel->deadline = NULL;
-  sel->pending = NULL;
+  sel->attachments = pn_record();
   sel->readable = NULL;
   sel->writable = NULL;
   sel->expired = NULL;
   sel->finalize = NULL;
+  sel->collector = NULL;
+  sel->deadline = 0;
+  sel->reading = false;
+  sel->writing = false;
   sel->registered = false;
   sel->terminal = false;
 }
 
-void pn_selectable_finalize(void *obj)
+void pn_selectable_finalize(pn_selectable_t *sel)
 {
-  pn_selectable_t *sel = (pn_selectable_t *) obj;
   if (sel->finalize) {
     sel->finalize(sel);
   }
-  pn_free(sel->context);
+  pn_decref(sel->attachments);
 }
 
 #define pn_selectable_hashcode NULL
 #define pn_selectable_inspect NULL
 #define pn_selectable_compare NULL
 
+PN_CLASSDEF(pn_selectable)
+
 pn_selectable_t *pn_selectable(void)
 {
-  static const pn_class_t clazz = PN_CLASS(pn_selectable);
-  return (pn_selectable_t *) pn_class_new(&clazz, sizeof(pn_selectable_t));
+  return pn_selectable_new();
+}
+
+bool pn_selectable_is_reading(pn_selectable_t *sel) {
+  assert(sel);
+  return sel->reading;
+}
+
+void pn_selectable_set_reading(pn_selectable_t *sel, bool reading) {
+  assert(sel);
+  sel->reading = reading;
+}
+
+bool pn_selectable_is_writing(pn_selectable_t *sel) {
+  assert(sel);
+  return sel->writing;
 }
 
-void pn_selectable_set_capacity(pn_selectable_t *sel, ssize_t (*capacity)(pn_selectable_t *)) {
+void pn_selectable_set_writing(pn_selectable_t *sel, bool writing) {
   assert(sel);
-  sel->capacity = capacity;
+  sel->writing = writing;
 }
 
-void pn_selectable_set_pending(pn_selectable_t *sel, ssize_t (*pending)(pn_selectable_t *)) {
+pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *sel) {
   assert(sel);
-  sel->pending = pending;
+  return sel->deadline;
 }
 
-void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t (*deadline)(pn_selectable_t *)) {
+void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline) {
   assert(sel);
   sel->deadline = deadline;
 }
 
-void pn_selectable_set_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) {
+void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) {
   assert(sel);
   sel->readable = readable;
 }
 
-void pn_selectable_set_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) {
+void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) {
   assert(sel);
   sel->writable = writable;
 }
 
-void pn_selectable_set_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) {
+void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) {
   assert(sel);
   sel->expired = expired;
 }
 
-void pn_selectable_set_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
+void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
   assert(sel);
   sel->finalize = finalize;
 }
 
 pn_record_t *pn_selectable_attachments(pn_selectable_t *sel) {
-  return sel->context;
-}
-
-pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
-                                ssize_t (*pending)(pn_selectable_t *),
-                                pn_timestamp_t (*deadline)(pn_selectable_t *),
-                                void (*readable)(pn_selectable_t *),
-                                void (*writable)(pn_selectable_t *),
-                                void (*expired)(pn_selectable_t *),
-                                void (*finalize)(pn_selectable_t *))
-{
-  pn_selectable_t *selectable = pn_selectable();
-  selectable->capacity = capacity;
-  selectable->pending = pending;
-  selectable->readable = readable;
-  selectable->deadline = deadline;
-  selectable->writable = writable;
-  selectable->expired = expired;
-  selectable->finalize = finalize;
-  return selectable;
+  return sel->attachments;
 }
 
 void *pni_selectable_get_context(pn_selectable_t *selectable)
 {
   assert(selectable);
-  return pn_record_get(selectable->context, PN_LEGCTX);
+  return pn_record_get(selectable->attachments, PN_LEGCTX);
 }
 
 void pni_selectable_set_context(pn_selectable_t *selectable, void *context)
 {
   assert(selectable);
-  pn_record_set(selectable->context, PN_LEGCTX, context);
+  pn_record_set(selectable->attachments, PN_LEGCTX, context);
 }
 
 int pni_selectable_get_index(pn_selectable_t *selectable)
@@ -185,36 +182,6 @@ void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd)
   selectable->fd = fd;
 }
 
-ssize_t pn_selectable_capacity(pn_selectable_t *selectable)
-{
-  assert(selectable);
-  if (selectable->capacity) {
-    return selectable->capacity(selectable);
-  } else {
-    return 0;
-  }
-}
-
-ssize_t pn_selectable_pending(pn_selectable_t *selectable)
-{
-  assert(selectable);
-  if (selectable->pending) {
-    return selectable->pending(selectable);
-  } else {
-    return 0;
-  }
-}
-
-pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable)
-{
-  assert(selectable);
-  if (selectable->deadline) {
-    return selectable->deadline(selectable);
-  } else {
-    return 0;
-  }
-}
-
 void pn_selectable_readable(pn_selectable_t *selectable)
 {
   assert(selectable);
@@ -254,10 +221,6 @@ void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered)
 bool pn_selectable_is_terminal(pn_selectable_t *selectable)
 {
   assert(selectable);
-  if (!selectable->terminal) {
-    selectable->terminal = (pn_selectable_capacity(selectable) < 0 &&
-                            pn_selectable_pending(selectable) < 0);
-  }
   return selectable->terminal;
 }
 
@@ -271,3 +234,27 @@ void pn_selectable_free(pn_selectable_t *selectable)
 {
   pn_free(selectable);
 }
+
+static void pni_readable(pn_selectable_t *selectable) {
+  pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_READABLE);
+}
+
+static void pni_writable(pn_selectable_t *selectable) {
+  pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_WRITABLE);
+}
+
+static void pni_expired(pn_selectable_t *selectable) {
+  pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_EXPIRED);
+}
+
+void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector) {
+  assert(selectable);
+  pn_decref(selectable->collector);
+  selectable->collector = collector;
+  pn_incref(selectable->collector);
+  if (collector) {
+    pn_selectable_on_readable(selectable, pni_readable);
+    pn_selectable_on_writable(selectable, pni_writable);
+    pn_selectable_on_expired(selectable, pni_expired);
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.h b/proton-c/src/selectable.h
index 69f719f..7a5b80b 100644
--- a/proton-c/src/selectable.h
+++ b/proton-c/src/selectable.h
@@ -28,13 +28,6 @@
 
 #include <proton/selectable.h>
 
-pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
-                                ssize_t (*pending)(pn_selectable_t *),
-                                pn_timestamp_t (*deadline)(pn_selectable_t *),
-                                void (*readable)(pn_selectable_t *),
-                                void (*writable)(pn_selectable_t *),
-                                void (*expired)(pn_selectable_t *),
-                                void (*finalize)(pn_selectable_t *));
 void *pni_selectable_get_context(pn_selectable_t *selectable);
 void pni_selectable_set_context(pn_selectable_t *selectable, void *context);
 int pni_selectable_get_index(pn_selectable_t *selectable);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/tests/python/proton_tests/messenger.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py
index db815b7..b22c16d 100644
--- a/tests/python/proton_tests/messenger.py
+++ b/tests/python/proton_tests/messenger.py
@@ -961,9 +961,9 @@ class Pump:
         sel.free()
         self.selectables.remove(sel)
       else:
-        if sel.capacity > 0:
+        if sel.reading:
           reading.append(sel)
-        if sel.pending > 0:
+        if sel.writing:
           writing.append(sel)
 
     readable, writable, _ = select(reading, writing, [], 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org