You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/02 16:53:01 UTC

svn commit: r1405017 - in /qpid/proton/trunk/proton-c: bindings/python/proton.py include/proton/driver.h include/proton/messenger.h src/driver.c src/messenger.c

Author: rhs
Date: Fri Nov  2 15:53:01 2012
New Revision: 1405017

URL: http://svn.apache.org/viewvc?rev=1405017&view=rev
Log:
added subscription tracking

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/driver.c
    qpid/proton/trunk/proton-c/src/messenger.c

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Fri Nov  2 15:53:01 2012
@@ -329,7 +329,9 @@ track the status of this many outgoing d
     @type source: string
     @param source: the source of messages to subscribe to
     """
-    self._check(pn_messenger_subscribe(self._mng, source))
+    sub_impl = pn_messenger_subscribe(self._mng, source)
+    if not sub_impl:
+      self._check(PN_ERR)
 
   def put(self, message):
     """

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Fri Nov  2 15:53:01 2012
@@ -189,6 +189,8 @@ pn_connector_t *pn_listener_accept(pn_li
  */
 void *pn_listener_context(pn_listener_t *listener);
 
+void pn_listener_set_context(pn_listener_t *listener, void *context);
+
 /** Close the socket used by the listener.
  *
  * @param[in] listener the listener whose socket will be closed.

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Fri Nov  2 15:53:01 2012
@@ -34,6 +34,7 @@ extern "C" {
  */
 
 typedef struct pn_messenger_t pn_messenger_t; /**< Messenger*/
+typedef struct pn_subscription_t pn_subscription_t; /**< Subscription*/
 typedef int64_t pn_tracker_t;
 
 typedef enum {
@@ -209,10 +210,13 @@ int pn_messenger_stop(pn_messenger_t *me
  * @param[in] messenger the messenger to subscribe
  * @param[in] source
  *
- * @return an error code or zero on success
- * @see error.h
+ * @return a subscription
  */
-int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
+pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source);
+
+void *pn_subscription_get_context(pn_subscription_t *sub);
+
+void pn_subscription_set_context(pn_subscription_t *sub, void *context);
 
 /** Puts a message on the outgoing message queue for a messenger.
  *
@@ -289,6 +293,8 @@ int pn_messenger_get(pn_messenger_t *mes
  */
 pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger);
 
+pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger);
+
 #define PN_CUMULATIVE (0x1)
 
 /** Accepts the incoming messages identified by the tracker. Use the

Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Fri Nov  2 15:53:01 2012
@@ -21,6 +21,7 @@
 
 #define _POSIX_C_SOURCE 1
 
+#include <assert.h>
 #include <poll.h>
 #include <stdio.h>
 #include <time.h>
@@ -210,6 +211,12 @@ void *pn_listener_context(pn_listener_t 
   return l ? l->context : NULL;
 }
 
+void pn_listener_set_context(pn_listener_t *listener, void *context)
+{
+  assert(listener);
+  listener->context = context;
+}
+
 static void pn_configure_sock(int sock) {
   // this would be nice, but doesn't appear to exist on linux
   /*

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1405017&r1=1405016&r2=1405017&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Fri Nov  2 15:53:01 2012
@@ -55,10 +55,19 @@ struct pn_messenger_t {
   pn_queue_t outgoing;
   pn_queue_t incoming;
   pn_accept_mode_t accept_mode;
+  pn_subscription_t *subscriptions;
+  size_t sub_capacity;
+  size_t sub_count;
+  pn_subscription_t *incoming_subscription;
   pn_buffer_t *buffer;
   pn_error_t *error;
 };
 
+struct pn_subscription_t {
+  char *scheme;
+  void *context;
+};
+
 void pn_queue_init(pn_queue_t *queue)
 {
   queue->capacity = 1024;
@@ -241,6 +250,10 @@ pn_messenger_t *pn_messenger(const char 
     pn_queue_init(&m->outgoing);
     pn_queue_init(&m->incoming);
     m->accept_mode = PN_ACCEPT_MODE_AUTO;
+    m->subscriptions = NULL;
+    m->sub_capacity = 0;
+    m->sub_count = 0;
+    m->incoming_subscription = NULL;
     m->buffer = pn_buffer(1024);
     m->error = pn_error();
   }
@@ -326,6 +339,10 @@ void pn_messenger_free(pn_messenger_t *m
     pn_error_free(messenger->error);
     pn_queue_tini(&messenger->incoming);
     pn_queue_tini(&messenger->outgoing);
+    for (int i = 0; i < messenger->sub_count; i++) {
+      free(messenger->subscriptions[i].scheme);
+    }
+    free(messenger->subscriptions);
     free(messenger);
   }
 }
@@ -461,7 +478,8 @@ int pn_messenger_tsync(pn_messenger_t *m
 
     pn_listener_t *l;
     while ((l = pn_driver_listener(messenger->driver))) {
-      char *scheme = pn_listener_context(l);
+      pn_subscription_t *sub = pn_listener_context(l);
+      char *scheme = sub->scheme;
       pn_connector_t *c = pn_listener_accept(l);
       pn_transport_t *t = pn_connector_transport(c);
       pn_ssl_t *ssl = pn_ssl(t);
@@ -545,7 +563,6 @@ int pn_messenger_stop(pn_messenger_t *me
     pn_listener_close(l);
     pn_listener_t *prev = l;
     l = pn_listener_next(l);
-    free(pn_listener_context(prev));
     pn_listener_free(prev);
   }
 
@@ -635,6 +652,27 @@ pn_connection_t *pn_messenger_resolve(pn
   return connection;
 }
 
+pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme)
+{
+  PN_ENSURE(messenger->subscriptions, messenger->sub_capacity, messenger->sub_count + 1);
+  pn_subscription_t *sub = messenger->subscriptions + messenger->sub_count++;
+  sub->scheme = pn_strdup(scheme);
+  sub->context = NULL;
+  return sub;
+}
+
+void *pn_subscription_get_context(pn_subscription_t *sub)
+{
+  assert(sub);
+  return sub->context;
+}
+
+void pn_subscription_set_context(pn_subscription_t *sub, void *context)
+{
+  assert(sub);
+  sub->context = context;
+}
+
 pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
 {
   char copy[(address ? strlen(address) : 0) + 1];
@@ -663,12 +701,11 @@ pn_link_t *pn_messenger_link(pn_messenge
   pn_session_open(ssn);
   link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx");
   // XXX
-  if (sender) {
-    pn_terminus_set_address(pn_link_target(link), name);
-    pn_terminus_set_address(pn_link_source(link), name);
-  } else {
-    pn_terminus_set_address(pn_link_target(link), name);
-    pn_terminus_set_address(pn_link_source(link), name);
+  pn_terminus_set_address(pn_link_target(link), name);
+  pn_terminus_set_address(pn_link_source(link), name);
+  if (!sender) {
+    pn_subscription_t *sub = pn_subscription(messenger, NULL);
+    pn_link_set_context(link, sub);
   }
   pn_link_open(link);
   return link;
@@ -684,7 +721,7 @@ pn_link_t *pn_messenger_target(pn_messen
   return pn_messenger_link(messenger, target, true);
 }
 
-int pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
+pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
 {
   char copy[strlen(source) + 1];
   strcpy(copy, source);
@@ -700,23 +737,27 @@ int pn_messenger_subscribe(pn_messenger_
 
   if (host[0] == '~') {
     pn_listener_t *lnr = pn_listener(messenger->driver, host + 1,
-                                     port ? port : default_port(scheme),
-                                     pn_strdup(scheme));
+                                     port ? port : default_port(scheme), NULL);
     if (lnr) {
-      return 0;
+      pn_subscription_t *sub = pn_subscription(messenger, scheme);
+      pn_listener_set_context(lnr, sub);
+      return sub;
     } else {
-      return pn_error_format(messenger->error, PN_ERR,
-                             "unable to subscribe to source: %s (%s)", source,
-                             pn_driver_error(messenger->driver));
+      pn_error_format(messenger->error, PN_ERR,
+                      "unable to subscribe to source: %s (%s)", source,
+                      pn_driver_error(messenger->driver));
+      return NULL;
     }
   } else {
     pn_link_t *src = pn_messenger_source(messenger, source);
     if (src) {
-      return 0;
+      pn_subscription_t *sub = pn_link_get_context(src);
+      return sub;
     } else {
-      return pn_error_format(messenger->error, PN_ERR,
-                             "unable to subscribe to source: %s (%s)", source,
-                             pn_driver_error(messenger->driver));
+      pn_error_format(messenger->error, PN_ERR,
+                      "unable to subscribe to source: %s (%s)", source,
+                      pn_driver_error(messenger->driver));
+      return NULL;
     }
   }
 }
@@ -962,6 +1003,7 @@ int pn_messenger_get(pn_messenger_t *mes
     while (d) {
       if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
         pn_link_t *l = pn_delivery_link(d);
+        pn_subscription_t *sub = pn_link_get_context(l);
         size_t pending = pn_delivery_pending(d);
         pn_buffer_t *buf = messenger->buffer;
         int err = pn_buffer_ensure(buf, pending + 1);
@@ -978,6 +1020,7 @@ int pn_messenger_get(pn_messenger_t *mes
           return pn_error_format(messenger->error, n, "PN_EOS expected");
         }
         pn_queue_add(&messenger->incoming, d);
+        messenger->incoming_subscription = sub;
         if (msg) {
           int err = pn_message_decode(msg, encoded, pending);
           if (err) {
@@ -1018,6 +1061,12 @@ pn_tracker_t pn_messenger_incoming_track
   return pn_tracker(INCOMING, messenger->incoming.hwm - 1);
 }
 
+pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  return messenger->incoming_subscription;
+}
+
 int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
 {
   if (pn_tracker_direction(tracker) != INCOMING) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org