You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/19 19:59:29 UTC
svn commit: r1626288 - in /qpid/proton/trunk/proton-c:
include/proton/messenger.h src/messenger/messenger.c
Author: rhs
Date: Fri Sep 19 17:59:28 2014
New Revision: 1626288
URL: http://svn.apache.org/r1626288
Log:
PROTON-674: patch from dominic for setting ttl on a subscription
Modified:
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger/messenger.c
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1626288&r1=1626287&r2=1626288&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Fri Sep 19 17:59:28 2014
@@ -472,6 +472,20 @@ PN_EXTERN bool pn_messenger_stopped(pn_m
PN_EXTERN pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
/**
+ * Subscribes a messenger to messages from the specified source with the given
+ * timeout for the subscription's lifetime.
+ *
+ * @param[in] messenger the messenger to subscribe
+ * @param[in] source
+ * @param[in] timeout the maximum time to keep the subscription alive once the
+ * link is closed.
+ * @return a subscription
+ */
+PN_EXTERN pn_subscription_t *
+pn_messenger_subscribe_ttl(pn_messenger_t *messenger, const char *source,
+ pn_seconds_t timeout);
+
+/**
* Get a subscription's application context.
*
* See ::pn_subscription_set_context().
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1626288&r1=1626287&r2=1626288&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Fri Sep 19 17:59:28 2014
@@ -1656,7 +1656,8 @@ pn_connection_t *pn_messenger_resolve(pn
return connection;
}
-pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
+pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address,
+ bool sender, pn_seconds_t timeout)
{
char *name = NULL;
pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name);
@@ -1705,6 +1706,14 @@ pn_link_t *pn_messenger_link(pn_messenge
pn_terminus_set_address(pn_link_source(link), name);
}
link_ctx_setup( messenger, connection, link );
+
+ if (timeout > 0) {
+ pn_terminus_set_expiry_policy(pn_link_target(link), PN_EXPIRE_WITH_LINK);
+ pn_terminus_set_expiry_policy(pn_link_source(link), PN_EXPIRE_WITH_LINK);
+ pn_terminus_set_timeout(pn_link_target(link), timeout);
+ pn_terminus_set_timeout(pn_link_source(link), timeout);
+ }
+
if (!sender) {
pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link);
assert( ctx );
@@ -1715,18 +1724,27 @@ pn_link_t *pn_messenger_link(pn_messenge
return link;
}
-pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source)
+pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source,
+ pn_seconds_t timeout)
{
- return pn_messenger_link(messenger, source, false);
+ return pn_messenger_link(messenger, source, false, timeout);
}
-pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target)
+pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target,
+ pn_seconds_t timeout)
{
- return pn_messenger_link(messenger, target, true);
+ return pn_messenger_link(messenger, target, true, timeout);
}
pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
{
+ return pn_messenger_subscribe_ttl(messenger, source, 0);
+}
+
+pn_subscription_t *pn_messenger_subscribe_ttl(pn_messenger_t *messenger,
+ const char *source,
+ pn_seconds_t timeout)
+{
pni_route(messenger, source);
if (pn_error_code(messenger->error)) return NULL;
@@ -1743,7 +1761,7 @@ pn_subscription_t *pn_messenger_subscrib
return NULL;
}
} else {
- pn_link_t *src = pn_messenger_source(messenger, source);
+ pn_link_t *src = pn_messenger_source(messenger, source, timeout);
if (!src) return NULL;
pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src );
return ctx ? ctx->subscription : NULL;
@@ -1916,7 +1934,7 @@ int pn_messenger_put(pn_messenger_t *mes
} else {
pni_restore(messenger, msg);
pn_buffer_append(buf, encoded, size); // XXX
- pn_link_t *sender = pn_messenger_target(messenger, address);
+ pn_link_t *sender = pn_messenger_target(messenger, address, 0);
if (!sender) {
int err = pn_error_code(messenger->error);
if (err) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org