You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/03/23 17:29:53 UTC

svn commit: r1460189 - in /qpid/proton/trunk: proton-c/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-j/proton-api/src/main/resources/ tests/python/proton_tests/

Author: rhs
Date: Sat Mar 23 16:29:53 2013
New Revision: 1460189

URL: http://svn.apache.org/r1460189
Log:
PROTON-160: added pn_messenger_route to the messenger API

Modified:
    qpid/proton/trunk/proton-c/CMakeLists.txt
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/   (props changed)
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Sat Mar 23 16:29:53 2013
@@ -185,7 +185,7 @@ if (CMAKE_COMPILER_IS_GNUCC)
   if (ENABLE_WARNING_ERROR)
     set (WERROR "-Werror")
   endif (ENABLE_WARNING_ERROR)
-  set (COMPILE_WARNING_FLAGS "${WERROR} -Wall -pedantic-errors")
+  set (COMPILE_WARNING_FLAGS "${WERROR} -Wall -pedantic-errors -Wno-comment")
   if (NOT BUILD_WITH_CXX)
     set (COMPILE_LANGUAGE_FLAGS "-std=c99")
     set (COMPILE_PLATFORM_FLAGS "-std=gnu99")

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Sat Mar 23 16:29:53 2013
@@ -489,6 +489,9 @@ send. Defaults to zero.
     """
     return pn_messenger_incoming(self._mng)
 
+  def route(self, pattern, host):
+    self._check(pn_messenger_route(self._mng, pattern, host))
+
 class Message(object):
   """
   The L{Message} class is a mutable holder of message content.

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Sat Mar 23 16:29:53 2013
@@ -381,6 +381,73 @@ PN_EXTERN int pn_messenger_outgoing(pn_m
  */
 PN_EXTERN int pn_messenger_incoming(pn_messenger_t *messenger);
 
+/** Adds a routing rule to a Messenger's internal routing table.
+ *
+ * The route procedure may be used to influence how a messenger will
+ * internally treat a given address or class of addresses. Every call
+ * to the route procedure will result in messenger appending a routing
+ * rule to its internal routing table.
+ *
+ * Whenever a message is presented to a messenger for delivery, it
+ * will match the address of this message against the set of routing
+ * rules in order. The first rule to match will be triggered, and
+ * instead of routing based on the address presented in the message,
+ * the messenger will route based on the address supplied in the rule.
+ *
+ * The pattern matching syntax supports two types of matches, a '%'
+ * will match any character except a '/', and a '*' will match any
+ * character including a '/'.
+ *
+ * A routing address is specified as a normal AMQP address, however it
+ * may additionally use substitution variables from the pattern match
+ * that triggered the rule.
+ *
+ * Any message sent to "foo" will be routed to "amqp://foo.com":
+ *
+ *   pn_messenger_route("foo", "amqp://foo.com");
+ *
+ * Any message sent to "foobar" will be routed to
+ * "amqp://foo.com/bar":
+ *
+ *   pn_messenger_route("foobar", "amqp://foo.com/bar");
+ *
+ * Any message sent to bar/<path> will be routed to the corresponding
+ * path within the amqp://bar.com domain:
+ *
+ *   pn_messenger_route("bar/*", "amqp://bar.com/$1");
+ *
+ * Route all messages over TLS:
+ *
+ *   pn_messenger_route("amqp:*", "amqps:$1")
+ *
+ * Supply credentials for foo.com:
+ *
+ *   pn_messenger_route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
+ *
+ * Supply credentials for all domains:
+ *
+ *   pn_messenger_route("amqp://*", "amqp://user:password@$1");
+ *
+ * Route all addresses through a single proxy while preserving the
+ * original destination:
+ *
+ *   pn_messenger_route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
+ *
+ * Route any address through a single broker:
+ *
+ *   pn_messenger_route("*", "amqp://user:password@broker/$1");
+ *
+ * @param[in] messenger the Messenger
+ * @param[in] pattern a glob pattern
+ * @param[in] address an address indicating alternative routing
+ *
+ * @return an error code or zero on success
+ * @see error.h
+ *
+ */
+PN_EXTERN int pn_messenger_route(pn_messenger_t *messenger, const char *pattern,
+                                 const char *address);
+
 #ifdef __cplusplus
 }
 #endif

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Sat Mar 23 16:29:53 2013
@@ -25,6 +25,7 @@
 #include <proton/ssl.h>
 #include <proton/buffer.h>
 #include <assert.h>
+#include <ctype.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
@@ -40,6 +41,42 @@ typedef struct {
   pn_delivery_t **deliveries;
 } pn_queue_t;
 
+typedef struct {
+  const char *start;
+  size_t size;
+} pn_group_t;
+
+#define MAX_GROUP (64)
+
+typedef struct {
+  size_t groups;
+  pn_group_t group[MAX_GROUP];
+} pn_matcher_t;
+
+#define PN_MAX_ADDR (1024)
+
+typedef struct {
+  char text[PN_MAX_ADDR + 1];
+  char *scheme;
+  char *user;
+  char *pass;
+  char *host;
+  char *port;
+  char *name;
+} pn_address_t;
+
+typedef struct pn_route_t pn_route_t;
+
+#define PN_MAX_PATTERN (255)
+#define PN_MAX_ROUTE (255)
+
+struct pn_route_t {
+  char pattern[PN_MAX_PATTERN + 1];
+  char address[PN_MAX_ROUTE + 1];
+  pn_route_t *next;
+};
+
+
 struct pn_messenger_t {
   char *name;
   char *certificate;
@@ -61,6 +98,9 @@ struct pn_messenger_t {
   pn_subscription_t *incoming_subscription;
   pn_buffer_t *buffer;
   pn_error_t *error;
+  pn_route_t *routes;
+  pn_matcher_t matcher;
+  pn_address_t address;
 };
 
 struct pn_subscription_t {
@@ -276,6 +316,7 @@ pn_messenger_t *pn_messenger(const char 
     m->incoming_subscription = NULL;
     m->buffer = pn_buffer(1024);
     m->error = pn_error();
+    m->routes = NULL;
   }
 
   return m;
@@ -363,6 +404,12 @@ void pn_messenger_free(pn_messenger_t *m
       free(messenger->subscriptions[i].scheme);
     }
     free(messenger->subscriptions);
+    pn_route_t *route = messenger->routes;
+    while (route) {
+      pn_route_t *next = route->next;
+      free(route);
+      route = next;
+    }
     free(messenger);
   }
 }
@@ -734,16 +781,140 @@ static const char *default_port(const ch
   else
     return "5672";
 }
-pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, char *address, char **name)
+
+static void pni_sub(pn_matcher_t *matcher, size_t group, const char *text, size_t matched)
+{
+  if (group > matcher->groups) {
+    matcher->groups = group;
+  }
+  matcher->group[group].start = text - matched;
+  matcher->group[group].size = matched;
+}
+
+static bool pni_match_r(pn_matcher_t *matcher, const char *pattern, const char *text, size_t group, size_t matched)
+{
+  bool match;
+
+  char p = *pattern;
+  char c = *text;
+
+  switch (p) {
+  case '\0': return c == '\0';
+  case '%':
+  case '*':
+    switch (c) {
+    case '\0':
+      match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+      if (match) pni_sub(matcher, group, text, matched);
+      return match;
+    case '/':
+      if (p == '%') {
+        match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+        if (match) pni_sub(matcher, group, text, matched);
+        return match;
+      }
+    default:
+      match = pni_match_r(matcher, pattern, text + 1, group, matched + 1);
+      if (!match) {
+        match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+        if (match) pni_sub(matcher, group, text, matched);
+      }
+      return match;
+    }
+  default:
+    return c == p && pni_match_r(matcher, pattern + 1, text + 1, group, 0);
+  }
+}
+
+static bool pni_match(pn_matcher_t *matcher, const char *pattern, const char *text)
+{
+  matcher->groups = 0;
+  if (pni_match_r(matcher, pattern, text, 1, 0)) {
+    matcher->group[0].start = text;
+    matcher->group[0].size = strlen(text);
+    return true;
+  } else {
+    matcher->groups = 0;
+    return false;
+  }
+}
+
+static void pni_substitute(pn_matcher_t *matcher, const char *pattern, char *dest)
+{
+  while (*pattern) {
+    switch (*pattern) {
+    case '$':
+      pattern++;
+      if (*pattern == '$') {
+        *dest++ = *pattern++;
+      } else {
+        size_t idx = 0;
+        while (isdigit(*pattern)) {
+          idx *= 10;
+          idx += *pattern++ - '0';
+        }
+
+        if (idx <= matcher->groups) {
+          pn_group_t *group = &matcher->group[idx];
+          for (size_t i = 0; i < group->size; i++) {
+            *dest++ = group->start[i];
+          }
+        }
+      }
+      break;
+    default:
+      *dest++ = *pattern++;
+      break;
+    }
+  }
+
+  *dest = '\0';
+}
+
+static void pni_parse(pn_address_t *address)
+{
+  address->scheme = NULL;
+  address->user = NULL;
+  address->pass = NULL;
+  address->host = NULL;
+  address->port = NULL;
+  address->name = NULL;
+  parse_url(address->text, &address->scheme, &address->user, &address->pass,
+            &address->host, &address->port, &address->name);
+}
+
+static pn_route_t *pni_route(pn_messenger_t *messenger, const char *address)
+{
+  pn_address_t *addr = &messenger->address;
+  pn_route_t *route = messenger->routes;
+  while (route) {
+    if (pni_match(&messenger->matcher, route->pattern, address)) {
+      pni_substitute(&messenger->matcher, route->address, addr->text);
+      pni_parse(addr);
+      return route;
+    }
+    route = route->next;
+  }
+
+  strcpy(addr->text, address);
+  pni_parse(addr);
+
+  return NULL;
+}
+
+pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *address, char **name)
 {
   char domain[256];
   if (sizeof(domain) < strlen(address) + 1) return NULL;
-  char *scheme = NULL;
-  char *user = NULL;
-  char *pass = NULL;
-  char *host = (char *) "0.0.0.0";
-  char *port = NULL;
-  parse_url(address, &scheme, &user, &pass, &host, &port, name);
+
+  pni_route(messenger, address);
+
+  char *scheme = messenger->address.scheme;
+  char *user = messenger->address.user;
+  char *pass = messenger->address.pass;
+  char *host = messenger->address.host;
+  char *port = messenger->address.port;
+  *name = messenger->address.name;
 
   domain[0] = '\0';
 
@@ -809,15 +980,8 @@ void pn_subscription_set_context(pn_subs
 
 pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
 {
-  char copy[256];
-  if (sizeof(copy) <= (address ? strlen(address) : 0)) return NULL;
-  if (address) {
-    strcpy(copy, address);
-  } else {
-    copy[0] = '\0';
-  }
   char *name = NULL;
-  pn_connection_t *connection = pn_messenger_resolve(messenger, copy, &name);
+  pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name);
   if (!connection) return NULL;
 
   pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE);
@@ -858,18 +1022,11 @@ pn_link_t *pn_messenger_target(pn_messen
 
 pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
 {
-  char copy[256];
-  if (strlen(source) >= sizeof(copy)) return NULL;
-  strcpy(copy, source);
-
-  char *scheme = NULL;
-  char *user = NULL;
-  char *pass = NULL;
-  char *host = (char *) "0.0.0.0";
-  char *port = NULL;
-  char *path = NULL;
+  pni_route(messenger, source);
 
-  parse_url(copy, &scheme, &user, &pass, &host, &port, &path);
+  char *scheme = messenger->address.scheme;
+  char *host = messenger->address.host;
+  char *port = messenger->address.port;
 
   if (host[0] == '~') {
     pn_listener_t *lnr = pn_listener(messenger->driver, host + 1,
@@ -1265,3 +1422,28 @@ int pn_messenger_incoming(pn_messenger_t
 {
   return pn_messenger_queued(messenger, false);
 }
+
+int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, const char *address)
+{
+  if (strlen(pattern) > PN_MAX_PATTERN || strlen(address) > PN_MAX_ROUTE) {
+    return PN_ERR;
+  }
+  pn_route_t *route = (pn_route_t *) malloc(sizeof(pn_route_t));
+  if (!route) return PN_ERR;
+
+  strcpy(route->pattern, pattern);
+  strcpy(route->address, address);
+  route->next = NULL;
+
+  pn_route_t *tail = messenger->routes;
+  if (!tail) {
+    messenger->routes = route;
+  } else {
+    while (tail->next) {
+      tail = tail->next;
+    }
+    tail->next = route;
+  }
+
+  return 0;
+}

Propchange: qpid/proton/trunk/proton-j/proton-api/src/main/resources/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 23 16:29:53 2013
@@ -0,0 +1 @@
+*.class

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Sat Mar 23 16:29:53 2013
@@ -1048,6 +1048,9 @@ class Messenger(object):
   def __init__(self, *args, **kwargs):
     self.impl = messengerFactory.createMessenger()
 
+  def route(self, *args, **kwargs):
+    raise Skipped()
+
   def start(self):
     self.impl.start()
 

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Sat Mar 23 16:29:53 2013
@@ -341,9 +341,9 @@ class MessengerTest(Test):
 
   def testRoute(self):
     self.server.subscribe("amqps://~0.0.0.0:12346")
+    self.start()
     self.client.route("route1", "amqp://0.0.0.0:12345")
     self.client.route("route2", "amqps://0.0.0.0:12346")
-    self.start()
 
     msg = Message()
     msg.address = "route1"
@@ -364,8 +364,8 @@ class MessengerTest(Test):
     assert reply.body == "test"
 
   def testDefaultRoute(self):
-    self.client.route("*", "amqp://0.0.0.0:12345")
     self.start()
+    self.client.route("*", "amqp://0.0.0.0:12345")
 
     msg = Message()
     msg.address = "asdf"
@@ -379,8 +379,8 @@ class MessengerTest(Test):
     assert reply.body == "test"
 
   def testDefaultRouteSubstitution(self):
-    self.client.route("*", "amqp://0.0.0.0:12345/$1")
     self.start()
+    self.client.route("*", "amqp://0.0.0.0:12345/$1")
 
     msg = Message()
     msg.address = "asdf"
@@ -394,9 +394,9 @@ class MessengerTest(Test):
     assert reply.body == "test"
 
   def testIncomingRoute(self):
+    self.start()
     self.client.route("in", "amqp://~0.0.0.0:12346")
     self.client.subscribe("in")
-    self.start()
 
     msg = Message()
     msg.address = "amqp://0.0.0.0:12345"



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org