You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/19 19:59:29 UTC

svn commit: r1626288 - in /qpid/proton/trunk/proton-c: include/proton/messenger.h src/messenger/messenger.c

Author: rhs
Date: Fri Sep 19 17:59:28 2014
New Revision: 1626288

URL: http://svn.apache.org/r1626288
Log:
PROTON-674: patch from dominic for setting ttl on a subscription

Modified:
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger/messenger.c

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1626288&r1=1626287&r2=1626288&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Fri Sep 19 17:59:28 2014
@@ -472,6 +472,20 @@ PN_EXTERN bool pn_messenger_stopped(pn_m
 PN_EXTERN pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
 
 /**
+ * Subscribes a messenger to messages from the specified source with the given
+ * timeout for the subscription's lifetime.
+ *
+ * @param[in] messenger the messenger to subscribe
+ * @param[in] source
+ * @param[in] timeout the maximum time to keep the subscription alive once the
+ *            link is closed.
+ * @return a subscription
+ */
+PN_EXTERN pn_subscription_t *
+pn_messenger_subscribe_ttl(pn_messenger_t *messenger, const char *source,
+                           pn_seconds_t timeout);
+
+/**
  * Get a subscription's application context.
  *
  * See ::pn_subscription_set_context().

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1626288&r1=1626287&r2=1626288&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Fri Sep 19 17:59:28 2014
@@ -1656,7 +1656,8 @@ pn_connection_t *pn_messenger_resolve(pn
   return connection;
 }
 
-pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
+pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address,
+                             bool sender, pn_seconds_t timeout)
 {
   char *name = NULL;
   pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name);
@@ -1705,6 +1706,14 @@ pn_link_t *pn_messenger_link(pn_messenge
     pn_terminus_set_address(pn_link_source(link), name);
   }
   link_ctx_setup( messenger, connection, link );
+
+  if (timeout > 0) {
+    pn_terminus_set_expiry_policy(pn_link_target(link), PN_EXPIRE_WITH_LINK);
+    pn_terminus_set_expiry_policy(pn_link_source(link), PN_EXPIRE_WITH_LINK);
+    pn_terminus_set_timeout(pn_link_target(link), timeout);
+    pn_terminus_set_timeout(pn_link_source(link), timeout);
+  }
+
   if (!sender) {
     pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link);
     assert( ctx );
@@ -1715,18 +1724,27 @@ pn_link_t *pn_messenger_link(pn_messenge
   return link;
 }
 
-pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source)
+pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source,
+                               pn_seconds_t timeout)
 {
-  return pn_messenger_link(messenger, source, false);
+  return pn_messenger_link(messenger, source, false, timeout);
 }
 
-pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target)
+pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target,
+                               pn_seconds_t timeout)
 {
-  return pn_messenger_link(messenger, target, true);
+  return pn_messenger_link(messenger, target, true, timeout);
 }
 
 pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
 {
+  return pn_messenger_subscribe_ttl(messenger, source, 0);
+}
+
+pn_subscription_t *pn_messenger_subscribe_ttl(pn_messenger_t *messenger,
+                                              const char *source,
+                                              pn_seconds_t timeout)
+{
   pni_route(messenger, source);
   if (pn_error_code(messenger->error)) return NULL;
 
@@ -1743,7 +1761,7 @@ pn_subscription_t *pn_messenger_subscrib
       return NULL;
     }
   } else {
-    pn_link_t *src = pn_messenger_source(messenger, source);
+    pn_link_t *src = pn_messenger_source(messenger, source, timeout);
     if (!src) return NULL;
     pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src );
     return ctx ? ctx->subscription : NULL;
@@ -1916,7 +1934,7 @@ int pn_messenger_put(pn_messenger_t *mes
     } else {
       pni_restore(messenger, msg);
       pn_buffer_append(buf, encoded, size); // XXX
-      pn_link_t *sender = pn_messenger_target(messenger, address);
+      pn_link_t *sender = pn_messenger_target(messenger, address, 0);
       if (!sender) {
         int err = pn_error_code(messenger->error);
         if (err) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org