You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/02 16:53:01 UTC
svn commit: r1405017 - in /qpid/proton/trunk/proton-c:
bindings/python/proton.py include/proton/driver.h
include/proton/messenger.h src/driver.c src/messenger.c
Author: rhs
Date: Fri Nov 2 15:53:01 2012
New Revision: 1405017
URL: http://svn.apache.org/viewvc?rev=1405017&view=rev
Log:
added subscription tracking
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/driver.h
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/messenger.c
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Fri Nov 2 15:53:01 2012
@@ -329,7 +329,9 @@ track the status of this many outgoing d
@type source: string
@param source: the source of messages to subscribe to
"""
- self._check(pn_messenger_subscribe(self._mng, source))
+ sub_impl = pn_messenger_subscribe(self._mng, source)
+ if not sub_impl:
+ self._check(PN_ERR)
def put(self, message):
"""
Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Fri Nov 2 15:53:01 2012
@@ -189,6 +189,8 @@ pn_connector_t *pn_listener_accept(pn_li
*/
void *pn_listener_context(pn_listener_t *listener);
+void pn_listener_set_context(pn_listener_t *listener, void *context);
+
/** Close the socket used by the listener.
*
* @param[in] listener the listener whose socket will be closed.
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Fri Nov 2 15:53:01 2012
@@ -34,6 +34,7 @@ extern "C" {
*/
typedef struct pn_messenger_t pn_messenger_t; /**< Messenger*/
+typedef struct pn_subscription_t pn_subscription_t; /**< Subscription*/
typedef int64_t pn_tracker_t;
typedef enum {
@@ -209,10 +210,13 @@ int pn_messenger_stop(pn_messenger_t *me
* @param[in] messenger the messenger to subscribe
* @param[in] source
*
- * @return an error code or zero on success
- * @see error.h
+ * @return a subscription
*/
-int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
+pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
+
+void *pn_subscription_get_context(pn_subscription_t *sub);
+
+void pn_subscription_set_context(pn_subscription_t *sub, void *context);
/** Puts a message on the outgoing message queue for a messenger.
*
@@ -289,6 +293,8 @@ int pn_messenger_get(pn_messenger_t *mes
*/
pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger);
+pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger);
+
#define PN_CUMULATIVE (0x1)
/** Accepts the incoming messages identified by the tracker. Use the
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Fri Nov 2 15:53:01 2012
@@ -21,6 +21,7 @@
#define _POSIX_C_SOURCE 1
+#include <assert.h>
#include <poll.h>
#include <stdio.h>
#include <time.h>
@@ -210,6 +211,12 @@ void *pn_listener_context(pn_listener_t
return l ? l->context : NULL;
}
+void pn_listener_set_context(pn_listener_t *listener, void *context)
+{
+ assert(listener);
+ listener->context = context;
+}
+
static void pn_configure_sock(int sock) {
// this would be nice, but doesn't appear to exist on linux
/*
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Nov 2 15:53:01 2012
@@ -55,10 +55,19 @@ struct pn_messenger_t {
pn_queue_t outgoing;
pn_queue_t incoming;
pn_accept_mode_t accept_mode;
+ pn_subscription_t *subscriptions;
+ size_t sub_capacity;
+ size_t sub_count;
+ pn_subscription_t *incoming_subscription;
pn_buffer_t *buffer;
pn_error_t *error;
};
+struct pn_subscription_t {
+ char *scheme;
+ void *context;
+};
+
void pn_queue_init(pn_queue_t *queue)
{
queue->capacity = 1024;
@@ -241,6 +250,10 @@ pn_messenger_t *pn_messenger(const char
pn_queue_init(&m->outgoing);
pn_queue_init(&m->incoming);
m->accept_mode = PN_ACCEPT_MODE_AUTO;
+ m->subscriptions = NULL;
+ m->sub_capacity = 0;
+ m->sub_count = 0;
+ m->incoming_subscription = NULL;
m->buffer = pn_buffer(1024);
m->error = pn_error();
}
@@ -326,6 +339,10 @@ void pn_messenger_free(pn_messenger_t *m
pn_error_free(messenger->error);
pn_queue_tini(&messenger->incoming);
pn_queue_tini(&messenger->outgoing);
+ for (int i = 0; i < messenger->sub_count; i++) {
+ free(messenger->subscriptions[i].scheme);
+ }
+ free(messenger->subscriptions);
free(messenger);
}
}
@@ -461,7 +478,8 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_listener_t *l;
while ((l = pn_driver_listener(messenger->driver))) {
- char *scheme = pn_listener_context(l);
+ pn_subscription_t *sub = pn_listener_context(l);
+ char *scheme = sub->scheme;
pn_connector_t *c = pn_listener_accept(l);
pn_transport_t *t = pn_connector_transport(c);
pn_ssl_t *ssl = pn_ssl(t);
@@ -545,7 +563,6 @@ int pn_messenger_stop(pn_messenger_t *me
pn_listener_close(l);
pn_listener_t *prev = l;
l = pn_listener_next(l);
- free(pn_listener_context(prev));
pn_listener_free(prev);
}
@@ -635,6 +652,27 @@ pn_connection_t *pn_messenger_resolve(pn
return connection;
}
+pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme)
+{
+ PN_ENSURE(messenger->subscriptions, messenger->sub_capacity, messenger->sub_count + 1);
+ pn_subscription_t *sub = messenger->subscriptions + messenger->sub_count++;
+ sub->scheme = pn_strdup(scheme);
+ sub->context = NULL;
+ return sub;
+}
+
+void *pn_subscription_get_context(pn_subscription_t *sub)
+{
+ assert(sub);
+ return sub->context;
+}
+
+void pn_subscription_set_context(pn_subscription_t *sub, void *context)
+{
+ assert(sub);
+ sub->context = context;
+}
+
pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
{
char copy[(address ? strlen(address) : 0) + 1];
@@ -663,12 +701,11 @@ pn_link_t *pn_messenger_link(pn_messenge
pn_session_open(ssn);
link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx");
// XXX
- if (sender) {
- pn_terminus_set_address(pn_link_target(link), name);
- pn_terminus_set_address(pn_link_source(link), name);
- } else {
- pn_terminus_set_address(pn_link_target(link), name);
- pn_terminus_set_address(pn_link_source(link), name);
+ pn_terminus_set_address(pn_link_target(link), name);
+ pn_terminus_set_address(pn_link_source(link), name);
+ if (!sender) {
+ pn_subscription_t *sub = pn_subscription(messenger, NULL);
+ pn_link_set_context(link, sub);
}
pn_link_open(link);
return link;
@@ -684,7 +721,7 @@ pn_link_t *pn_messenger_target(pn_messen
return pn_messenger_link(messenger, target, true);
}
-int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
+pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
{
char copy[strlen(source) + 1];
strcpy(copy, source);
@@ -700,23 +737,27 @@ int pn_messenger_subscribe(pn_messenger_
if (host[0] == '~') {
pn_listener_t *lnr = pn_listener(messenger->driver, host + 1,
- port ? port : default_port(scheme),
- pn_strdup(scheme));
+ port ? port : default_port(scheme), NULL);
if (lnr) {
- return 0;
+ pn_subscription_t *sub = pn_subscription(messenger, scheme);
+ pn_listener_set_context(lnr, sub);
+ return sub;
} else {
- return pn_error_format(messenger->error, PN_ERR,
- "unable to subscribe to source: %s (%s)", source,
- pn_driver_error(messenger->driver));
+ pn_error_format(messenger->error, PN_ERR,
+ "unable to subscribe to source: %s (%s)", source,
+ pn_driver_error(messenger->driver));
+ return NULL;
}
} else {
pn_link_t *src = pn_messenger_source(messenger, source);
if (src) {
- return 0;
+ pn_subscription_t *sub = pn_link_get_context(src);
+ return sub;
} else {
- return pn_error_format(messenger->error, PN_ERR,
- "unable to subscribe to source: %s (%s)", source,
- pn_driver_error(messenger->driver));
+ pn_error_format(messenger->error, PN_ERR,
+ "unable to subscribe to source: %s (%s)", source,
+ pn_driver_error(messenger->driver));
+ return NULL;
}
}
}
@@ -962,6 +1003,7 @@ int pn_messenger_get(pn_messenger_t *mes
while (d) {
if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
pn_link_t *l = pn_delivery_link(d);
+ pn_subscription_t *sub = pn_link_get_context(l);
size_t pending = pn_delivery_pending(d);
pn_buffer_t *buf = messenger->buffer;
int err = pn_buffer_ensure(buf, pending + 1);
@@ -978,6 +1020,7 @@ int pn_messenger_get(pn_messenger_t *mes
return pn_error_format(messenger->error, n, "PN_EOS expected");
}
pn_queue_add(&messenger->incoming, d);
+ messenger->incoming_subscription = sub;
if (msg) {
int err = pn_message_decode(msg, encoded, pending);
if (err) {
@@ -1018,6 +1061,12 @@ pn_tracker_t pn_messenger_incoming_track
return pn_tracker(INCOMING, messenger->incoming.hwm - 1);
}
+pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger)
+{
+ assert(messenger);
+ return messenger->incoming_subscription;
+}
+
int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
{
if (pn_tracker_direction(tracker) != INCOMING) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org