You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/03/23 17:29:53 UTC
svn commit: r1460189 - in /qpid/proton/trunk: proton-c/
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/
proton-j/proton-api/src/main/resources/ tests/python/proton_tests/
Author: rhs
Date: Sat Mar 23 16:29:53 2013
New Revision: 1460189
URL: http://svn.apache.org/r1460189
Log:
PROTON-160: added pn_messenger_route to the messenger API
Modified:
qpid/proton/trunk/proton-c/CMakeLists.txt
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/resources/ (props changed)
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Sat Mar 23 16:29:53 2013
@@ -185,7 +185,7 @@ if (CMAKE_COMPILER_IS_GNUCC)
if (ENABLE_WARNING_ERROR)
set (WERROR "-Werror")
endif (ENABLE_WARNING_ERROR)
- set (COMPILE_WARNING_FLAGS "${WERROR} -Wall -pedantic-errors")
+ set (COMPILE_WARNING_FLAGS "${WERROR} -Wall -pedantic-errors -Wno-comment")
if (NOT BUILD_WITH_CXX)
set (COMPILE_LANGUAGE_FLAGS "-std=c99")
set (COMPILE_PLATFORM_FLAGS "-std=gnu99")
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Sat Mar 23 16:29:53 2013
@@ -489,6 +489,9 @@ send. Defaults to zero.
"""
return pn_messenger_incoming(self._mng)
+ def route(self, pattern, host):
+ self._check(pn_messenger_route(self._mng, pattern, host))
+
class Message(object):
"""
The L{Message} class is a mutable holder of message content.
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Sat Mar 23 16:29:53 2013
@@ -381,6 +381,73 @@ PN_EXTERN int pn_messenger_outgoing(pn_m
*/
PN_EXTERN int pn_messenger_incoming(pn_messenger_t *messenger);
+/** Adds a routing rule to a Messenger's internal routing table.
+ *
+ * The route procedure may be used to influence how a messenger will
+ * internally treat a given address or class of addresses. Every call
+ * to the route procedure will result in messenger appending a routing
+ * rule to its internal routing table.
+ *
+ * Whenever a message is presented to a messenger for delivery, it
+ * will match the address of this message against the set of routing
+ * rules in order. The first rule to match will be triggered, and
+ * instead of routing based on the address presented in the message,
+ * the messenger will route based on the address supplied in the rule.
+ *
+ * The pattern matching syntax supports two types of matches, a '%'
+ * will match any character except a '/', and a '*' will match any
+ * character including a '/'.
+ *
+ * A routing address is specified as a normal AMQP address, however it
+ * may additionally use substitution variables from the pattern match
+ * that triggered the rule.
+ *
+ * Any message sent to "foo" will be routed to "amqp://foo.com":
+ *
+ * pn_messenger_route("foo", "amqp://foo.com");
+ *
+ * Any message sent to "foobar" will be routed to
+ * "amqp://foo.com/bar":
+ *
+ * pn_messenger_route("foobar", "amqp://foo.com/bar");
+ *
+ * Any message sent to bar/<path> will be routed to the corresponding
+ * path within the amqp://bar.com domain:
+ *
+ * pn_messenger_route("bar/*", "amqp://bar.com/$1");
+ *
+ * Route all messages over TLS:
+ *
+ * pn_messenger_route("amqp:*", "amqps:$1")
+ *
+ * Supply credentials for foo.com:
+ *
+ * pn_messenger_route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
+ *
+ * Supply credentials for all domains:
+ *
+ * pn_messenger_route("amqp://*", "amqp://user:password@$1");
+ *
+ * Route all addresses through a single proxy while preserving the
+ * original destination:
+ *
+ * pn_messenger_route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
+ *
+ * Route any address through a single broker:
+ *
+ * pn_messenger_route("*", "amqp://user:password@broker/$1");
+ *
+ * @param[in] messenger the Messenger
+ * @param[in] pattern a glob pattern
+ * @param[in] address an address indicating alternative routing
+ *
+ * @return an error code or zero on success
+ * @see error.h
+ *
+ */
+PN_EXTERN int pn_messenger_route(pn_messenger_t *messenger, const char *pattern,
+ const char *address);
+
#ifdef __cplusplus
}
#endif
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Sat Mar 23 16:29:53 2013
@@ -25,6 +25,7 @@
#include <proton/ssl.h>
#include <proton/buffer.h>
#include <assert.h>
+#include <ctype.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -40,6 +41,42 @@ typedef struct {
pn_delivery_t **deliveries;
} pn_queue_t;
+typedef struct {
+ const char *start;
+ size_t size;
+} pn_group_t;
+
+#define MAX_GROUP (64)
+
+typedef struct {
+ size_t groups;
+ pn_group_t group[MAX_GROUP];
+} pn_matcher_t;
+
+#define PN_MAX_ADDR (1024)
+
+typedef struct {
+ char text[PN_MAX_ADDR + 1];
+ char *scheme;
+ char *user;
+ char *pass;
+ char *host;
+ char *port;
+ char *name;
+} pn_address_t;
+
+typedef struct pn_route_t pn_route_t;
+
+#define PN_MAX_PATTERN (255)
+#define PN_MAX_ROUTE (255)
+
+struct pn_route_t {
+ char pattern[PN_MAX_PATTERN + 1];
+ char address[PN_MAX_ROUTE + 1];
+ pn_route_t *next;
+};
+
+
struct pn_messenger_t {
char *name;
char *certificate;
@@ -61,6 +98,9 @@ struct pn_messenger_t {
pn_subscription_t *incoming_subscription;
pn_buffer_t *buffer;
pn_error_t *error;
+ pn_route_t *routes;
+ pn_matcher_t matcher;
+ pn_address_t address;
};
struct pn_subscription_t {
@@ -276,6 +316,7 @@ pn_messenger_t *pn_messenger(const char
m->incoming_subscription = NULL;
m->buffer = pn_buffer(1024);
m->error = pn_error();
+ m->routes = NULL;
}
return m;
@@ -363,6 +404,12 @@ void pn_messenger_free(pn_messenger_t *m
free(messenger->subscriptions[i].scheme);
}
free(messenger->subscriptions);
+ pn_route_t *route = messenger->routes;
+ while (route) {
+ pn_route_t *next = route->next;
+ free(route);
+ route = next;
+ }
free(messenger);
}
}
@@ -734,16 +781,140 @@ static const char *default_port(const ch
else
return "5672";
}
-pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, char *address, char **name)
+
+static void pni_sub(pn_matcher_t *matcher, size_t group, const char *text, size_t matched)
+{
+ if (group > matcher->groups) {
+ matcher->groups = group;
+ }
+ matcher->group[group].start = text - matched;
+ matcher->group[group].size = matched;
+}
+
+static bool pni_match_r(pn_matcher_t *matcher, const char *pattern, const char *text, size_t group, size_t matched)
+{
+ bool match;
+
+ char p = *pattern;
+ char c = *text;
+
+ switch (p) {
+ case '\0': return c == '\0';
+ case '%':
+ case '*':
+ switch (c) {
+ case '\0':
+ match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+ if (match) pni_sub(matcher, group, text, matched);
+ return match;
+ case '/':
+ if (p == '%') {
+ match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+ if (match) pni_sub(matcher, group, text, matched);
+ return match;
+ }
+ default:
+ match = pni_match_r(matcher, pattern, text + 1, group, matched + 1);
+ if (!match) {
+ match = pni_match_r(matcher, pattern + 1, text, group + 1, 0);
+ if (match) pni_sub(matcher, group, text, matched);
+ }
+ return match;
+ }
+ default:
+ return c == p && pni_match_r(matcher, pattern + 1, text + 1, group, 0);
+ }
+}
+
+static bool pni_match(pn_matcher_t *matcher, const char *pattern, const char *text)
+{
+ matcher->groups = 0;
+ if (pni_match_r(matcher, pattern, text, 1, 0)) {
+ matcher->group[0].start = text;
+ matcher->group[0].size = strlen(text);
+ return true;
+ } else {
+ matcher->groups = 0;
+ return false;
+ }
+}
+
+static void pni_substitute(pn_matcher_t *matcher, const char *pattern, char *dest)
+{
+ while (*pattern) {
+ switch (*pattern) {
+ case '$':
+ pattern++;
+ if (*pattern == '$') {
+ *dest++ = *pattern++;
+ } else {
+ size_t idx = 0;
+ while (isdigit(*pattern)) {
+ idx *= 10;
+ idx += *pattern++ - '0';
+ }
+
+ if (idx <= matcher->groups) {
+ pn_group_t *group = &matcher->group[idx];
+ for (size_t i = 0; i < group->size; i++) {
+ *dest++ = group->start[i];
+ }
+ }
+ }
+ break;
+ default:
+ *dest++ = *pattern++;
+ break;
+ }
+ }
+
+ *dest = '\0';
+}
+
+static void pni_parse(pn_address_t *address)
+{
+ address->scheme = NULL;
+ address->user = NULL;
+ address->pass = NULL;
+ address->host = NULL;
+ address->port = NULL;
+ address->name = NULL;
+ parse_url(address->text, &address->scheme, &address->user, &address->pass,
+ &address->host, &address->port, &address->name);
+}
+
+static pn_route_t *pni_route(pn_messenger_t *messenger, const char *address)
+{
+ pn_address_t *addr = &messenger->address;
+ pn_route_t *route = messenger->routes;
+ while (route) {
+ if (pni_match(&messenger->matcher, route->pattern, address)) {
+ pni_substitute(&messenger->matcher, route->address, addr->text);
+ pni_parse(addr);
+ return route;
+ }
+ route = route->next;
+ }
+
+ strcpy(addr->text, address);
+ pni_parse(addr);
+
+ return NULL;
+}
+
+pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *address, char **name)
{
char domain[256];
if (sizeof(domain) < strlen(address) + 1) return NULL;
- char *scheme = NULL;
- char *user = NULL;
- char *pass = NULL;
- char *host = (char *) "0.0.0.0";
- char *port = NULL;
- parse_url(address, &scheme, &user, &pass, &host, &port, name);
+
+ pni_route(messenger, address);
+
+ char *scheme = messenger->address.scheme;
+ char *user = messenger->address.user;
+ char *pass = messenger->address.pass;
+ char *host = messenger->address.host;
+ char *port = messenger->address.port;
+ *name = messenger->address.name;
domain[0] = '\0';
@@ -809,15 +980,8 @@ void pn_subscription_set_context(pn_subs
pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
{
- char copy[256];
- if (sizeof(copy) <= (address ? strlen(address) : 0)) return NULL;
- if (address) {
- strcpy(copy, address);
- } else {
- copy[0] = '\0';
- }
char *name = NULL;
- pn_connection_t *connection = pn_messenger_resolve(messenger, copy, &name);
+ pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name);
if (!connection) return NULL;
pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE);
@@ -858,18 +1022,11 @@ pn_link_t *pn_messenger_target(pn_messen
pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
{
- char copy[256];
- if (strlen(source) >= sizeof(copy)) return NULL;
- strcpy(copy, source);
-
- char *scheme = NULL;
- char *user = NULL;
- char *pass = NULL;
- char *host = (char *) "0.0.0.0";
- char *port = NULL;
- char *path = NULL;
+ pni_route(messenger, source);
- parse_url(copy, &scheme, &user, &pass, &host, &port, &path);
+ char *scheme = messenger->address.scheme;
+ char *host = messenger->address.host;
+ char *port = messenger->address.port;
if (host[0] == '~') {
pn_listener_t *lnr = pn_listener(messenger->driver, host + 1,
@@ -1265,3 +1422,28 @@ int pn_messenger_incoming(pn_messenger_t
{
return pn_messenger_queued(messenger, false);
}
+
+int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, const char *address)
+{
+ if (strlen(pattern) > PN_MAX_PATTERN || strlen(address) > PN_MAX_ROUTE) {
+ return PN_ERR;
+ }
+ pn_route_t *route = (pn_route_t *) malloc(sizeof(pn_route_t));
+ if (!route) return PN_ERR;
+
+ strcpy(route->pattern, pattern);
+ strcpy(route->address, address);
+ route->next = NULL;
+
+ pn_route_t *tail = messenger->routes;
+ if (!tail) {
+ messenger->routes = route;
+ } else {
+ while (tail->next) {
+ tail = tail->next;
+ }
+ tail->next = route;
+ }
+
+ return 0;
+}
Propchange: qpid/proton/trunk/proton-j/proton-api/src/main/resources/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 23 16:29:53 2013
@@ -0,0 +1 @@
+*.class
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Sat Mar 23 16:29:53 2013
@@ -1048,6 +1048,9 @@ class Messenger(object):
def __init__(self, *args, **kwargs):
self.impl = messengerFactory.createMessenger()
+ def route(self, *args, **kwargs):
+ raise Skipped()
+
def start(self):
self.impl.start()
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1460189&r1=1460188&r2=1460189&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Sat Mar 23 16:29:53 2013
@@ -341,9 +341,9 @@ class MessengerTest(Test):
def testRoute(self):
self.server.subscribe("amqps://~0.0.0.0:12346")
+ self.start()
self.client.route("route1", "amqp://0.0.0.0:12345")
self.client.route("route2", "amqps://0.0.0.0:12346")
- self.start()
msg = Message()
msg.address = "route1"
@@ -364,8 +364,8 @@ class MessengerTest(Test):
assert reply.body == "test"
def testDefaultRoute(self):
- self.client.route("*", "amqp://0.0.0.0:12345")
self.start()
+ self.client.route("*", "amqp://0.0.0.0:12345")
msg = Message()
msg.address = "asdf"
@@ -379,8 +379,8 @@ class MessengerTest(Test):
assert reply.body == "test"
def testDefaultRouteSubstitution(self):
- self.client.route("*", "amqp://0.0.0.0:12345/$1")
self.start()
+ self.client.route("*", "amqp://0.0.0.0:12345/$1")
msg = Message()
msg.address = "asdf"
@@ -394,9 +394,9 @@ class MessengerTest(Test):
assert reply.body == "test"
def testIncomingRoute(self):
+ self.start()
self.client.route("in", "amqp://~0.0.0.0:12346")
self.client.subscribe("in")
- self.start()
msg = Message()
msg.address = "amqp://0.0.0.0:12345"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org