You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/01/17 02:56:53 UTC
qpid-proton git commit: added pn_selectable_collect and modified
callbacks to allow for embedding languages to define their own selectables
Repository: qpid-proton
Updated Branches:
refs/heads/master 6982026d7 -> c69cfe64f
added pn_selectable_collect and modified callbacks to allow for embedding languages to define their own selectables
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c69cfe64
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c69cfe64
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c69cfe64
Branch: refs/heads/master
Commit: c69cfe64f8f7190c973da297eb7761de4e9a1eb5
Parents: 6982026
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Fri Jan 16 20:41:26 2015 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Fri Jan 16 20:46:07 2015 -0500
----------------------------------------------------------------------
proton-c/bindings/javascript/CMakeLists.txt | 2 +-
proton-c/bindings/javascript/module.js | 6 +-
proton-c/bindings/python/proton/__init__.py | 44 +++++--
proton-c/bindings/python/setup.py | 30 +++++
proton-c/include/proton/event.h | 6 +-
proton-c/include/proton/selectable.h | 49 ++++----
proton-c/src/events/event.c | 6 +
proton-c/src/messenger/messenger.c | 101 +++++-----------
proton-c/src/posix/selector.c | 6 +-
proton-c/src/reactor/acceptor.c | 10 +-
proton-c/src/reactor/connection.c | 91 +++++++-------
proton-c/src/reactor/reactor.c | 5 +-
proton-c/src/selectable.c | 147 +++++++++++------------
proton-c/src/selectable.h | 7 --
tests/python/proton_tests/messenger.py | 4 +-
15 files changed, 262 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/javascript/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt
index 288201f..d10212c 100644
--- a/proton-c/bindings/javascript/CMakeLists.txt
+++ b/proton-c/bindings/javascript/CMakeLists.txt
@@ -219,7 +219,7 @@ set_target_properties(
# fiddly with node.js packages. This behaviour might be reinstated if the
# packaging mechanism improves.
- LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_su
bscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_p
n_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_p
n_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_capacity', '_pn_selectable_writable', '_pn_selectable_pending', '_pn_selectable_is_terminal'
, '_pn_selectable_fd', '_pn_selectable_free']\""
+ LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_
pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_su
bscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_p
n_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_p
n_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_is_reading', '_pn_selectable_writable', '_pn_selectable_is_writing', '_pn_selectable_is_term
inal', '_pn_selectable_get_fd', '_pn_selectable_free']\""
)
# This command packages up the compiled proton.js into a node.js package called
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/javascript/module.js
----------------------------------------------------------------------
diff --git a/proton-c/bindings/javascript/module.js b/proton-c/bindings/javascript/module.js
index 59d2517..3603de3 100644
--- a/proton-c/bindings/javascript/module.js
+++ b/proton-c/bindings/javascript/module.js
@@ -221,7 +221,7 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl
for (var name in _messengers) {
var messenger = _messengers[name];
while ((sel = _pn_messenger_selectable(messenger._messenger))) {
- fd = _pn_selectable_fd(sel);
+ fd = _pn_selectable_get_fd(sel);
// Only register valid selectables, otherwise free them.
if (fd === -1) {
_pn_selectable_free(sel);
@@ -268,8 +268,8 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl
if (sock.sock_ops.poll) {
var mask = sock.sock_ops.poll(sock); // Low-level poll call.
if (mask) {
- var capacity = _pn_selectable_capacity(sel) > 0;
- var pending = _pn_selectable_pending(sel) > 0;
+ var capacity = _pn_selectable_is_reading(sel);
+ var pending = _pn_selectable_is_writing(sel);
if ((mask & POLLIN) && capacity) {
//console.log("- readable fd = " + fd + ", capacity = " + _pn_selectable_capacity(sel));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 6e0a44d..98cb9d7 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -1156,35 +1156,57 @@ class Subscription(object):
def address(self):
return pn_subscription_address(self._impl)
+_DEFAULT = object()
+
class Selectable(object):
def __init__(self, messenger, impl):
self.messenger = messenger
self._impl = impl
- def fileno(self):
+ def fileno(self, fd = _DEFAULT):
if not self._impl: raise ValueError("selectable freed")
- return pn_selectable_get_fd(self._impl)
+ if fd is _DEFAULT:
+ return pn_selectable_get_fd(self._impl)
+ elif fd is None:
+ pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
+ else:
+ pn_selectable_set_fd(self._impl, fd)
- @property
- def capacity(self):
+ def _is_reading(self):
if not self._impl: raise ValueError("selectable freed")
- return pn_selectable_capacity(self._impl)
+ return pn_selectable_is_reading(self._impl)
- @property
- def pending(self):
+ def _set_reading(self, val):
if not self._impl: raise ValueError("selectable freed")
- return pn_selectable_pending(self._impl)
+ pn_selectable_set_reading(self._impl, bool(val))
- @property
- def deadline(self):
+ reading = property(_is_reading, _set_reading)
+
+ def _is_writing(self):
if not self._impl: raise ValueError("selectable freed")
- tstamp = pn_selectable_deadline(self._impl)
+ return pn_selectable_is_writing(self._impl)
+
+ def _set_writing(self, val):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_set_writing(self._impl, bool(val))
+
+ writing = property(_is_writing, _set_writing)
+
+ def _get_deadline(self):
+ if not self._impl: raise ValueError("selectable freed")
+ tstamp = pn_selectable_get_deadline(self._impl)
if tstamp:
return millis2secs(tstamp)
else:
return None
+ def _set_deadline(self, deadline):
+ if not self._impl: raise ValueError("selectable freed")
+ pn_selectable_set_deadline(self._impl, secs2millis(deadline))
+
+ deadline = property(_get_deadline, _set_deadline)
+
def readable(self):
if not self._impl: raise ValueError("selectable freed")
pn_selectable_readable(self._impl)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/bindings/python/setup.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/setup.py b/proton-c/bindings/python/setup.py
new file mode 100755
index 0000000..9765342
--- /dev/null
+++ b/proton-c/bindings/python/setup.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from distutils.core import setup, Extension
+
+setup(name='Proton',
+ version='0.9',
+ description='An AMQP based messaging library.',
+ author='Apache Qpid',
+ author_email='proton@qpid.apache.org',
+ url='http://qpid.apache.org/proton/',
+ packages=['proton'],
+ ext_modules=[Extension('cproton', ['cproton.i'])])
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 34d60c3..ae19e49 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -281,7 +281,11 @@ typedef enum {
* Indicates that the both the head and tail of the transport are
* closed. Events of this type point to the relevant transport.
*/
- PN_TRANSPORT_CLOSED
+ PN_TRANSPORT_CLOSED,
+
+ PN_SELECTABLE_READABLE,
+ PN_SELECTABLE_WRITABLE,
+ PN_SELECTABLE_EXPIRED
} pn_event_type_t;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/include/proton/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/selectable.h b/proton-c/include/proton/selectable.h
index 884efd2..615b72b 100644
--- a/proton-c/include/proton/selectable.h
+++ b/proton-c/include/proton/selectable.h
@@ -24,6 +24,7 @@
#include <proton/import_export.h>
#include <proton/object.h>
+#include <proton/event.h>
#include <proton/io.h>
#include <proton/type_compat.h>
@@ -92,13 +93,10 @@ PN_EXTERN void pn_selectables_free(pn_selectables_t *selectables);
PN_EXTERN pn_selectable_t *pn_selectable(void);
-PN_EXTERN void pn_selectable_set_capacity(pn_selectable_t *sel, ssize_t (*capacity)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_pending(pn_selectable_t *sel, ssize_t (*pending)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t (*deadline)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *));
-PN_EXTERN void pn_selectable_set_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *));
+PN_EXTERN void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *));
PN_EXTERN pn_record_t *pn_selectable_attachments(pn_selectable_t *sel);
@@ -119,28 +117,24 @@ PN_EXTERN pn_socket_t pn_selectable_get_fd(pn_selectable_t *selectable);
PN_EXTERN void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd);
/**
- * Get the capacity of a selectable.
- *
- * A selectable with a positive capacity is interested in being
- * notified of read events. A negative capacity indicates that the
- * selectable will never be interested in read events ever again.
+ * Check if a selectable is interested in readable events.
*
* @param[in] selectable a selectable object
- * @return the selectables capacity
+ * @return true iff the selectable is interested in read events
*/
-PN_EXTERN ssize_t pn_selectable_capacity(pn_selectable_t *selectable);
+PN_EXTERN bool pn_selectable_is_reading(pn_selectable_t *selectable);
+
+PN_EXTERN void pn_selectable_set_reading(pn_selectable_t *sel, bool reading);
/**
- * Get the number of bytes pending for a selectable.
- *
- * A selectable with pending bytes is interested in being notified of
- * write events. If this value is negative then the selectable will
- * never be interested in write events ever again.
+ * Check if a selectable is interested in writable events.
*
* @param[in] selectable a selectable object
- * @return the number of bytes pending for the selectable
+ * @return true iff the selectable is interested in writable events
*/
-PN_EXTERN ssize_t pn_selectable_pending(pn_selectable_t *selectable);
+PN_EXTERN bool pn_selectable_is_writing(pn_selectable_t *selectable);
+
+ PN_EXTERN void pn_selectable_set_writing(pn_selectable_t *sel, bool writing);
/**
* Get the next deadline for a selectable.
@@ -152,7 +146,9 @@ PN_EXTERN ssize_t pn_selectable_pending(pn_selectable_t *selectable);
* @param[in] selectable a selectable object
* @return the next deadline or zero
*/
-PN_EXTERN pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable);
+PN_EXTERN pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *selectable);
+
+PN_EXTERN void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline);
/**
* Notify a selectable that the file descriptor is readable.
@@ -226,6 +222,15 @@ PN_EXTERN void pn_selectable_terminate(pn_selectable_t *selectable);
PN_EXTERN void pn_selectable_free(pn_selectable_t *selectable);
/**
+ * Configure a selectable with a set of callbacks that emit events
+ * into the provided collector.
+ *
+ * @param[in] selectable a selectable objet
+ * @param[in] collector an event collector
+ */
+PN_EXTERN void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector);
+
+/**
* @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/events/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c
index 1670704..0abd8c4 100644
--- a/proton-c/src/events/event.c
+++ b/proton-c/src/events/event.c
@@ -310,6 +310,12 @@ const char *pn_event_type_name(pn_event_type_t type)
return "PN_TRANSPORT_TAIL_CLOSED";
case PN_TRANSPORT_CLOSED:
return "PN_TRANSPORT_CLOSED";
+ case PN_SELECTABLE_READABLE:
+ return "PN_SELECTABLE_READABLE";
+ case PN_SELECTABLE_WRITABLE:
+ return "PN_SELECTABLE_WRITABLE";
+ case PN_SELECTABLE_EXPIRED:
+ return "PN_SELECTABLE_EXPIRED";
}
return "<unrecognized>";
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c
index b46ca69..ec3fd3d 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -192,6 +192,17 @@ static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
return ctx->messenger->next_drain;
}
+static void pni_connection_update(pn_selectable_t *sel) {
+ ssize_t c = pni_connection_capacity(sel);
+ pn_selectable_set_reading(sel, c > 0);
+ ssize_t p = pni_connection_pending(sel);
+ pn_selectable_set_writing(sel, p > 0);
+ pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+ if (c < 0 && p < 0) {
+ pn_selectable_terminate(sel);
+ }
+}
+
#include <errno.h>
static void pn_error_report(const char *pfx, const char *error)
@@ -211,6 +222,7 @@ void pni_modified(pn_ctx_t *ctx)
void pni_conn_modified(pn_connection_ctx_t *ctx)
{
+ pni_connection_update(ctx->selectable);
pni_modified((pn_ctx_t *) ctx);
}
@@ -296,21 +308,6 @@ static void pni_connection_finalize(pn_selectable_t *sel)
pni_messenger_reclaim(ctx->messenger, ctx->connection);
}
-static ssize_t pni_listener_capacity(pn_selectable_t *sel)
-{
- return 1;
-}
-
-static ssize_t pni_listener_pending(pn_selectable_t *sel)
-{
- return 0;
-}
-
-static pn_timestamp_t pni_listener_deadline(pn_selectable_t *sel)
-{
- return 0;
-}
-
pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
pn_socket_t sock,
const char *scheme,
@@ -340,16 +337,7 @@ static void pni_listener_readable(pn_selectable_t *sel)
pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, scheme, NULL, NULL, NULL, NULL, ctx);
pn_transport_bind(t, conn);
-}
-
-static void pni_listener_writable(pn_selectable_t *sel)
-{
- // do nothing
-}
-
-static void pni_listener_expired(pn_selectable_t *sel)
-{
- // do nothing
+ pni_conn_modified((pn_connection_ctx_t *) pn_connection_get_context(conn));
}
static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t *ctx);
@@ -417,13 +405,10 @@ static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger,
ctx->host = pn_strdup(host);
ctx->port = pn_strdup(port);
- pn_selectable_t *selectable = pni_selectable(pni_listener_capacity,
- pni_listener_pending,
- pni_listener_deadline,
- pni_listener_readable,
- pni_listener_writable,
- pni_listener_expired,
- pni_listener_finalize);
+ pn_selectable_t *selectable = pn_selectable();
+ pn_selectable_set_reading(selectable, true);
+ pn_selectable_on_readable(selectable, pni_listener_readable);
+ pn_selectable_on_finalize(selectable, pni_listener_finalize);
pn_selectable_set_fd(selectable, socket);
pni_selectable_set_context(selectable, ctx);
pn_list_add(messenger->pending, selectable);
@@ -459,13 +444,12 @@ static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
ctx = (pn_connection_ctx_t *) malloc(sizeof(pn_connection_ctx_t));
ctx->messenger = messenger;
ctx->connection = conn;
- ctx->selectable = pni_selectable(pni_connection_capacity,
- pni_connection_pending,
- pni_connection_deadline,
- pni_connection_readable,
- pni_connection_writable,
- pni_connection_expired,
- pni_connection_finalize);
+ pn_selectable_t *sel = pn_selectable();
+ ctx->selectable = sel;
+ pn_selectable_on_readable(sel, pni_connection_readable);
+ pn_selectable_on_writable(sel, pni_connection_writable);
+ pn_selectable_on_expired(sel, pni_connection_expired);
+ pn_selectable_on_finalize(sel, pni_connection_finalize);
pn_selectable_set_fd(ctx->selectable, sock);
pni_selectable_set_context(ctx->selectable, ctx);
pn_list_add(messenger->pending, ctx->selectable);
@@ -477,7 +461,6 @@ static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
ctx->port = pn_strdup(port);
ctx->listener = lnr;
pn_connection_set_context(conn, ctx);
-
return ctx;
}
@@ -559,21 +542,6 @@ static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
}
}
-static ssize_t pni_interruptor_capacity(pn_selectable_t *sel)
-{
- return 1024;
-}
-
-static ssize_t pni_interruptor_pending(pn_selectable_t *sel)
-{
- return 0;
-}
-
-static pn_timestamp_t pni_interruptor_deadline(pn_selectable_t *sel)
-{
- return 0;
-}
-
static void pni_interruptor_readable(pn_selectable_t *sel)
{
pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
@@ -582,16 +550,6 @@ static void pni_interruptor_readable(pn_selectable_t *sel)
messenger->interrupted = true;
}
-static void pni_interruptor_writable(pn_selectable_t *sel)
-{
- // do nothing
-}
-
-static void pni_interruptor_expired(pn_selectable_t *sel)
-{
- // do nothing
-}
-
static void pni_interruptor_finalize(pn_selectable_t *sel)
{
pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
@@ -613,11 +571,10 @@ pn_messenger_t *pn_messenger(const char *name)
m->passive = false;
m->io = pn_io();
m->pending = pn_list(PN_WEAKREF, 0);
- m->interruptor = pni_selectable
- (pni_interruptor_capacity, pni_interruptor_pending,
- pni_interruptor_deadline, pni_interruptor_readable,
- pni_interruptor_writable, pni_interruptor_expired,
- pni_interruptor_finalize);
+ m->interruptor = pn_selectable();
+ pn_selectable_set_reading(m->interruptor, true);
+ pn_selectable_on_readable(m->interruptor, pni_interruptor_readable);
+ pn_selectable_on_finalize(m->interruptor, pni_interruptor_finalize);
pn_list_add(m->pending, m->interruptor);
m->interrupted = false;
// Explicitly initialise pipe file descriptors to invalid values in case pipe
@@ -1659,10 +1616,10 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add
pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, NULL);
pn_transport_t *transport = pn_transport();
pn_transport_bind(transport, connection);
+ pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
+ pn_selectable_t *sel = ctx->selectable;
err = pn_transport_config(messenger, connection);
if (err) {
- pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
- pn_selectable_t *sel = ctx->selectable;
pn_selectable_free(sel);
messenger->connection_error = err;
return NULL;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/posix/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/posix/selector.c b/proton-c/src/posix/selector.c
index 0fd8918..fa38d1c 100644
--- a/proton-c/src/posix/selector.c
+++ b/proton-c/src/posix/selector.c
@@ -99,13 +99,13 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
selector->fds[idx].fd = pn_selectable_get_fd(selectable);
selector->fds[idx].events = 0;
selector->fds[idx].revents = 0;
- if (pn_selectable_capacity(selectable) > 0) {
+ if (pn_selectable_is_reading(selectable)) {
selector->fds[idx].events |= POLLIN;
}
- if (pn_selectable_pending(selectable) > 0) {
+ if (pn_selectable_is_writing(selectable)) {
selector->fds[idx].events |= POLLOUT;
}
- selector->deadlines[idx] = pn_selectable_deadline(selectable);
+ selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
}
void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index 8942109..a9b7dab 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -26,10 +26,6 @@
#include "reactor.h"
#include "selectable.h"
-static ssize_t pni_acceptor_capacity(pn_selectable_t *sel) {
- return 1;
-}
-
pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
void pni_acceptor_readable(pn_selectable_t *sel) {
@@ -57,14 +53,14 @@ void pni_acceptor_finalize(pn_selectable_t *sel) {
pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
pn_selectable_t *sel = pn_reactor_selectable(reactor);
- pn_selectable_set_capacity(sel, pni_acceptor_capacity);
- pn_selectable_set_readable(sel, pni_acceptor_readable);
- pn_selectable_set_finalize(sel, pni_acceptor_finalize);
pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port);
pn_selectable_set_fd(sel, socket);
+ pn_selectable_on_readable(sel, pni_acceptor_readable);
+ pn_selectable_on_finalize(sel, pni_acceptor_finalize);
pni_selectable_set_context(sel, reactor);
pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
pni_record_init_handler(pn_selectable_attachments(sel), handler);
+ pn_selectable_set_reading(sel, true);
pn_reactor_update(reactor, sel);
return (pn_acceptor_t *) sel;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c
index 73bd4ff..0987e41 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -33,12 +33,55 @@
static void *pni_transportctx = NULL;
#define PN_TRANCTX ((pn_handle_t) &pni_transportctx)
+static pn_transport_t *pni_transport(pn_selectable_t *sel) {
+ pn_record_t *record = pn_selectable_attachments(sel);
+ return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity < 0) {
+ if (pn_transport_closed(transport)) {
+ pn_selectable_terminate(sel);
+ }
+ }
+ return capacity;
+}
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending < 0) {
+ if (pn_transport_closed(transport)) {
+ pn_selectable_terminate(sel);
+ }
+ }
+ return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+ return 0;
+}
+
+static void pni_connection_update(pn_selectable_t *sel) {
+ ssize_t c = pni_connection_capacity(sel);
+ ssize_t p = pni_connection_pending(sel);
+ pn_selectable_set_reading(sel, c > 0);
+ pn_selectable_set_writing(sel, p > 0);
+ pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+}
+
void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) {
assert(reactor);
pn_transport_t *transport = pn_event_transport(event);
pn_record_t *record = pn_transport_attachments(transport);
pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX);
if (sel && !pn_selectable_is_terminal(sel)) {
+ pni_connection_update(sel);
pn_reactor_update(reactor, sel);
}
}
@@ -80,40 +123,6 @@ void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {
pn_list_remove(pn_reactor_children(reactor), conn);
}
-static pn_transport_t *pni_transport(pn_selectable_t *sel) {
- pn_record_t *record = pn_selectable_attachments(sel);
- return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
-}
-
-static ssize_t pni_connection_capacity(pn_selectable_t *sel)
-{
- pn_transport_t *transport = pni_transport(sel);
- ssize_t capacity = pn_transport_capacity(transport);
- if (capacity < 0) {
- if (pn_transport_closed(transport)) {
- pn_selectable_terminate(sel);
- }
- }
- return capacity;
-}
-
-static ssize_t pni_connection_pending(pn_selectable_t *sel)
-{
- pn_transport_t *transport = pni_transport(sel);
- ssize_t pending = pn_transport_pending(transport);
- if (pending < 0) {
- if (pn_transport_closed(transport)) {
- pn_selectable_terminate(sel);
- }
- }
- return pending;
-}
-
-static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
-{
- return 0;
-}
-
static void pni_connection_readable(pn_selectable_t *sel)
{
pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
@@ -134,6 +143,7 @@ static void pni_connection_readable(pn_selectable_t *sel)
ssize_t newcap = pn_transport_capacity(transport);
if (newcap != capacity) {
+ pni_connection_update(sel);
pn_reactor_update(reactor, sel);
}
}
@@ -158,6 +168,7 @@ static void pni_connection_writable(pn_selectable_t *sel)
ssize_t newpending = pn_transport_pending(transport);
if (newpending != pending) {
+ pni_connection_update(sel);
pn_reactor_update(reactor, sel);
}
}
@@ -175,14 +186,11 @@ static void pni_connection_finalize(pn_selectable_t *sel) {
pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) {
pn_selectable_t *sel = pn_reactor_selectable(reactor);
- pn_selectable_set_capacity(sel, pni_connection_capacity);
- pn_selectable_set_pending(sel, pni_connection_pending);
- pn_selectable_set_deadline(sel, pni_connection_deadline);
- pn_selectable_set_readable(sel, pni_connection_readable);
- pn_selectable_set_writable(sel, pni_connection_writable);
- pn_selectable_set_expired(sel, pni_connection_expired);
- pn_selectable_set_finalize(sel, pni_connection_finalize);
pn_selectable_set_fd(sel, sock);
+ pn_selectable_on_readable(sel, pni_connection_readable);
+ pn_selectable_on_writable(sel, pni_connection_writable);
+ pn_selectable_on_expired(sel, pni_connection_expired);
+ pn_selectable_on_finalize(sel, pni_connection_finalize);
pni_selectable_set_context(sel, reactor);
pn_record_t *record = pn_selectable_attachments(sel);
pn_record_def(record, PN_TRANCTX, PN_OBJECT);
@@ -191,6 +199,7 @@ pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socke
pn_record_def(tr, PN_TRANCTX, PN_WEAKREF);
pn_record_set(tr, PN_TRANCTX, sel);
pni_record_init_reactor(tr, reactor);
+ pni_connection_update(sel);
pn_reactor_update(reactor, sel);
return sel;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index b17473c..c3c5098 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -98,14 +98,14 @@ static void pni_timer_expired(pn_selectable_t *sel) {
pn_selectable_t *pni_selectable_timer(pn_reactor_t *reactor) {
pn_selectable_t *sel = pn_reactor_selectable(reactor);
- pn_selectable_set_deadline(sel, pni_timer_deadline);
- pn_selectable_set_expired(sel, pni_timer_expired);
+ pn_selectable_on_expired(sel, pni_timer_expired);
pni_selectable_set_context(sel, reactor);
pn_record_t *record = pn_selectable_attachments(sel);
pn_record_def(record, 0x1, PN_OBJECT);
pn_timer_t *timer = pn_timer(reactor->collector);
pn_record_set(record, 0x1, timer);
pn_decref(timer);
+ pn_selectable_set_deadline(sel, pni_timer_deadline(sel));
pn_reactor_update(reactor, sel);
return sel;
}
@@ -299,6 +299,7 @@ pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *h
pn_record_t *record = pn_task_attachments(task);
pni_record_init_reactor(record, reactor);
pni_record_init_handler(record, handler);
+ pn_selectable_set_deadline(reactor->timer, pni_timer_deadline(reactor->timer));
pn_reactor_update(reactor, reactor->timer);
return task;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.c b/proton-c/src/selectable.c
index 35d9637..1f295ff 100644
--- a/proton-c/src/selectable.c
+++ b/proton-c/src/selectable.c
@@ -43,122 +43,119 @@ void pn_selectables_free(pn_selectables_t *selectables)
struct pn_selectable_t {
pn_socket_t fd;
int index;
- pn_record_t *context;
- ssize_t (*capacity)(pn_selectable_t *);
- ssize_t (*pending)(pn_selectable_t *);
- pn_timestamp_t (*deadline)(pn_selectable_t *);
+ pn_record_t *attachments;
void (*readable)(pn_selectable_t *);
void (*writable)(pn_selectable_t *);
void (*expired)(pn_selectable_t *);
void (*finalize)(pn_selectable_t *);
+ pn_collector_t *collector;
+ pn_timestamp_t deadline;
+ bool reading;
+ bool writing;
bool registered;
bool terminal;
};
-void pn_selectable_initialize(void *obj)
+void pn_selectable_initialize(pn_selectable_t *sel)
{
- pn_selectable_t *sel = (pn_selectable_t *) obj;
sel->fd = PN_INVALID_SOCKET;
sel->index = -1;
- sel->context = pn_record();
- sel->capacity = NULL;
- sel->deadline = NULL;
- sel->pending = NULL;
+ sel->attachments = pn_record();
sel->readable = NULL;
sel->writable = NULL;
sel->expired = NULL;
sel->finalize = NULL;
+ sel->collector = NULL;
+ sel->deadline = 0;
+ sel->reading = false;
+ sel->writing = false;
sel->registered = false;
sel->terminal = false;
}
-void pn_selectable_finalize(void *obj)
+void pn_selectable_finalize(pn_selectable_t *sel)
{
- pn_selectable_t *sel = (pn_selectable_t *) obj;
if (sel->finalize) {
sel->finalize(sel);
}
- pn_free(sel->context);
+ pn_decref(sel->attachments);
}
#define pn_selectable_hashcode NULL
#define pn_selectable_inspect NULL
#define pn_selectable_compare NULL
+PN_CLASSDEF(pn_selectable)
+
pn_selectable_t *pn_selectable(void)
{
- static const pn_class_t clazz = PN_CLASS(pn_selectable);
- return (pn_selectable_t *) pn_class_new(&clazz, sizeof(pn_selectable_t));
+ return pn_selectable_new();
+}
+
+bool pn_selectable_is_reading(pn_selectable_t *sel) {
+ assert(sel);
+ return sel->reading;
+}
+
+void pn_selectable_set_reading(pn_selectable_t *sel, bool reading) {
+ assert(sel);
+ sel->reading = reading;
+}
+
+bool pn_selectable_is_writing(pn_selectable_t *sel) {
+ assert(sel);
+ return sel->writing;
}
-void pn_selectable_set_capacity(pn_selectable_t *sel, ssize_t (*capacity)(pn_selectable_t *)) {
+void pn_selectable_set_writing(pn_selectable_t *sel, bool writing) {
assert(sel);
- sel->capacity = capacity;
+ sel->writing = writing;
}
-void pn_selectable_set_pending(pn_selectable_t *sel, ssize_t (*pending)(pn_selectable_t *)) {
+pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *sel) {
assert(sel);
- sel->pending = pending;
+ return sel->deadline;
}
-void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t (*deadline)(pn_selectable_t *)) {
+void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline) {
assert(sel);
sel->deadline = deadline;
}
-void pn_selectable_set_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) {
+void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) {
assert(sel);
sel->readable = readable;
}
-void pn_selectable_set_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) {
+void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) {
assert(sel);
sel->writable = writable;
}
-void pn_selectable_set_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) {
+void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) {
assert(sel);
sel->expired = expired;
}
-void pn_selectable_set_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
+void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
assert(sel);
sel->finalize = finalize;
}
pn_record_t *pn_selectable_attachments(pn_selectable_t *sel) {
- return sel->context;
-}
-
-pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
- ssize_t (*pending)(pn_selectable_t *),
- pn_timestamp_t (*deadline)(pn_selectable_t *),
- void (*readable)(pn_selectable_t *),
- void (*writable)(pn_selectable_t *),
- void (*expired)(pn_selectable_t *),
- void (*finalize)(pn_selectable_t *))
-{
- pn_selectable_t *selectable = pn_selectable();
- selectable->capacity = capacity;
- selectable->pending = pending;
- selectable->readable = readable;
- selectable->deadline = deadline;
- selectable->writable = writable;
- selectable->expired = expired;
- selectable->finalize = finalize;
- return selectable;
+ return sel->attachments;
}
void *pni_selectable_get_context(pn_selectable_t *selectable)
{
assert(selectable);
- return pn_record_get(selectable->context, PN_LEGCTX);
+ return pn_record_get(selectable->attachments, PN_LEGCTX);
}
void pni_selectable_set_context(pn_selectable_t *selectable, void *context)
{
assert(selectable);
- pn_record_set(selectable->context, PN_LEGCTX, context);
+ pn_record_set(selectable->attachments, PN_LEGCTX, context);
}
int pni_selectable_get_index(pn_selectable_t *selectable)
@@ -185,36 +182,6 @@ void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd)
selectable->fd = fd;
}
-ssize_t pn_selectable_capacity(pn_selectable_t *selectable)
-{
- assert(selectable);
- if (selectable->capacity) {
- return selectable->capacity(selectable);
- } else {
- return 0;
- }
-}
-
-ssize_t pn_selectable_pending(pn_selectable_t *selectable)
-{
- assert(selectable);
- if (selectable->pending) {
- return selectable->pending(selectable);
- } else {
- return 0;
- }
-}
-
-pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable)
-{
- assert(selectable);
- if (selectable->deadline) {
- return selectable->deadline(selectable);
- } else {
- return 0;
- }
-}
-
void pn_selectable_readable(pn_selectable_t *selectable)
{
assert(selectable);
@@ -254,10 +221,6 @@ void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered)
bool pn_selectable_is_terminal(pn_selectable_t *selectable)
{
assert(selectable);
- if (!selectable->terminal) {
- selectable->terminal = (pn_selectable_capacity(selectable) < 0 &&
- pn_selectable_pending(selectable) < 0);
- }
return selectable->terminal;
}
@@ -271,3 +234,27 @@ void pn_selectable_free(pn_selectable_t *selectable)
{
pn_free(selectable);
}
+
+static void pni_readable(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_READABLE);
+}
+
+static void pni_writable(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_WRITABLE);
+}
+
+static void pni_expired(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_EXPIRED);
+}
+
+void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector) {
+ assert(selectable);
+ pn_decref(selectable->collector);
+ selectable->collector = collector;
+ pn_incref(selectable->collector);
+ if (collector) {
+ pn_selectable_on_readable(selectable, pni_readable);
+ pn_selectable_on_writable(selectable, pni_writable);
+ pn_selectable_on_expired(selectable, pni_expired);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/proton-c/src/selectable.h
----------------------------------------------------------------------
diff --git a/proton-c/src/selectable.h b/proton-c/src/selectable.h
index 69f719f..7a5b80b 100644
--- a/proton-c/src/selectable.h
+++ b/proton-c/src/selectable.h
@@ -28,13 +28,6 @@
#include <proton/selectable.h>
-pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
- ssize_t (*pending)(pn_selectable_t *),
- pn_timestamp_t (*deadline)(pn_selectable_t *),
- void (*readable)(pn_selectable_t *),
- void (*writable)(pn_selectable_t *),
- void (*expired)(pn_selectable_t *),
- void (*finalize)(pn_selectable_t *));
void *pni_selectable_get_context(pn_selectable_t *selectable);
void pni_selectable_set_context(pn_selectable_t *selectable, void *context);
int pni_selectable_get_index(pn_selectable_t *selectable);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c69cfe64/tests/python/proton_tests/messenger.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/messenger.py b/tests/python/proton_tests/messenger.py
index db815b7..b22c16d 100644
--- a/tests/python/proton_tests/messenger.py
+++ b/tests/python/proton_tests/messenger.py
@@ -961,9 +961,9 @@ class Pump:
sel.free()
self.selectables.remove(sel)
else:
- if sel.capacity > 0:
+ if sel.reading:
reading.append(sel)
- if sel.pending > 0:
+ if sel.writing:
writing.append(sel)
readable, writable, _ = select(reading, writing, [], 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org