You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [10/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/log.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/log.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/log.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/log.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,11 +25,35 @@
#include <string.h>
#include <stdio.h>
+static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
+static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
+static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
+static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
+static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
+static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
+static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
+static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
+static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
+static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
+static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
+static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
+static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
+static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
+static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
+static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
+static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
+static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
+static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
+static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
+static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
+static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
+
ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
ALLOC_DEFINE(dx_message_content_t);
+typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
-static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
dx_buffer_t *local_buffer = *buffer;
@@ -37,9 +61,13 @@ static void advance(unsigned char **curs
int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
while (consume > 0) {
if (consume < remaining) {
+ if (handler)
+ handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
+ if (handler)
+ handler(context, local_cursor, remaining);
consume -= remaining;
local_buffer = local_buffer->next;
if (local_buffer == 0){
@@ -59,7 +87,7 @@ static void advance(unsigned char **curs
static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
{
unsigned char result = **cursor;
- advance(cursor, buffer, 1);
+ advance(cursor, buffer, 1, 0, 0);
return result;
}
@@ -68,7 +96,10 @@ static int traverse_field(unsigned char
{
unsigned char tag = next_octet(cursor, buffer);
if (!(*cursor)) return 0;
- int consume = 0;
+
+ int consume = 0;
+ size_t hdr_length = 1;
+
switch (tag & 0xF0) {
case 0x40 : consume = 0; break;
case 0x50 : consume = 1; break;
@@ -80,6 +111,7 @@ static int traverse_field(unsigned char
case 0xB0 :
case 0xD0 :
case 0xF0 :
+ hdr_length += 3;
consume |= ((int) next_octet(cursor, buffer)) << 24;
if (!(*cursor)) return 0;
consume |= ((int) next_octet(cursor, buffer)) << 16;
@@ -91,19 +123,21 @@ static int traverse_field(unsigned char
case 0xA0 :
case 0xC0 :
case 0xE0 :
+ hdr_length++;
consume |= (int) next_octet(cursor, buffer);
if (!(*cursor)) return 0;
break;
}
if (field && !field->parsed) {
- field->buffer = *buffer;
- field->offset = *cursor - dx_buffer_base(*buffer);
- field->length = consume;
- field->parsed = 1;
+ field->buffer = *buffer;
+ field->offset = *cursor - dx_buffer_base(*buffer);
+ field->length = consume;
+ field->hdr_length = hdr_length;
+ field->parsed = 1;
}
- advance(cursor, buffer, consume);
+ advance(cursor, buffer, consume, 0, 0);
return 1;
}
@@ -210,10 +244,11 @@ static int dx_check_and_advance(dx_buffe
//
// Pattern matched and tag is expected. Mark the beginning of the section.
//
- location->parsed = 1;
- location->buffer = test_buffer;
- location->offset = test_cursor - dx_buffer_base(test_buffer);
- location->length = 0;
+ location->parsed = 1;
+ location->buffer = test_buffer;
+ location->offset = test_cursor - dx_buffer_base(test_buffer);
+ location->length = 0;
+ location->hdr_length = pattern_length;
//
// Advance the pointers to consume the whole section.
@@ -249,7 +284,7 @@ static int dx_check_and_advance(dx_buffe
location->length = pre_consume + consume;
if (consume)
- advance(&test_cursor, &test_buffer, consume);
+ advance(&test_cursor, &test_buffer, consume, 0, 0);
*cursor = test_cursor;
*buffer = test_buffer;
@@ -318,6 +353,11 @@ static dx_field_location_t *dx_message_f
}
break;
+ case DX_FIELD_DELIVERY_ANNOTATION:
+ if (content->section_delivery_annotation.parsed)
+ return &content->section_delivery_annotation;
+ break;
+
case DX_FIELD_APPLICATION_PROPERTIES:
if (content->section_application_properties.parsed)
return &content->section_application_properties;
@@ -343,8 +383,7 @@ dx_message_t *dx_allocate_message()
return 0;
DEQ_ITEM_INIT(msg);
- msg->content = new_dx_message_content_t();
- msg->out_delivery = 0;
+ msg->content = new_dx_message_content_t();
if (msg->content == 0) {
free_dx_message_t((dx_message_t*) msg);
@@ -355,6 +394,7 @@ dx_message_t *dx_allocate_message()
msg->content->lock = sys_mutex();
msg->content->ref_count = 1;
msg->content->parse_depth = DX_DEPTH_NONE;
+ msg->content->parsed_delivery_annotations = 0;
return (dx_message_t*) msg;
}
@@ -371,14 +411,23 @@ void dx_free_message(dx_message_t *in_ms
sys_mutex_unlock(content->lock);
if (rc == 0) {
- dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ if (content->parsed_delivery_annotations)
+ dx_parse_free(content->parsed_delivery_annotations);
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
dx_free_buffer(buf);
buf = DEQ_HEAD(content->buffers);
}
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->new_delivery_annotations);
+ dx_free_buffer(buf);
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ }
+
sys_mutex_free(content->lock);
free_dx_message_content_t(content);
}
@@ -397,8 +446,7 @@ dx_message_t *dx_message_copy(dx_message
return 0;
DEQ_ITEM_INIT(copy);
- copy->content = content;
- copy->out_delivery = 0;
+ copy->content = content;
sys_mutex_lock(content->lock);
content->ref_count++;
@@ -408,55 +456,58 @@ dx_message_t *dx_message_copy(dx_message
}
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg)
{
- ((dx_message_pvt_t*) msg)->out_delivery = delivery;
-}
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ if (content->parsed_delivery_annotations)
+ return content->parsed_delivery_annotations;
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
-{
- return ((dx_message_pvt_t*) msg)->out_delivery;
-}
+ dx_field_iterator_t *da = dx_message_field_iterator(in_msg, DX_FIELD_DELIVERY_ANNOTATION);
+ if (da == 0)
+ return 0;
+ content->parsed_delivery_annotations = dx_parse(da);
+ if (content->parsed_delivery_annotations == 0 ||
+ !dx_parse_ok(content->parsed_delivery_annotations) ||
+ !dx_parse_is_map(content->parsed_delivery_annotations)) {
+ dx_field_iterator_free(da);
+ dx_parse_free(content->parsed_delivery_annotations);
+ return 0;
+ }
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
-{
- dx_message_content_t *content = MSG_CONTENT(msg);
- content->in_delivery = delivery;
+ return content->parsed_delivery_annotations;
}
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
+void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da)
{
- dx_message_content_t *content = MSG_CONTENT(msg);
- return content->in_delivery;
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_list_t *field_buffers = dx_compose_buffers(da);
+
+ assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
+ content->new_delivery_annotations = *field_buffers;
+ DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
}
-dx_message_t *dx_message_receive(pn_delivery_t *delivery)
+dx_message_t *dx_message_receive(dx_delivery_t *delivery)
{
- pn_link_t *link = pn_delivery_link(delivery);
- dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+ pn_delivery_t *pnd = dx_delivery_pn(delivery);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) dx_delivery_context(delivery);
+ pn_link_t *link = pn_delivery_link(pnd);
ssize_t rc;
dx_buffer_t *buf;
//
// If there is no message associated with the delivery, this is the first time
- // we've received anything on this delivery. Allocate a message descriptor and
+ // we've received anything on this delivery. Allocate a message descriptor and
// link it and the delivery together.
//
if (!msg) {
msg = (dx_message_pvt_t*) dx_allocate_message();
- pn_delivery_set_context(delivery, (void*) msg);
-
- //
- // Record the incoming delivery only if it is not settled. If it is
- // settled, it should not be recorded as no future operations on it are
- // permitted.
- //
- if (!pn_delivery_settled(delivery))
- msg->content->in_delivery = delivery;
+ dx_delivery_set_context(delivery, (void*) msg);
}
//
@@ -489,6 +540,7 @@ dx_message_t *dx_message_receive(pn_deli
DEQ_REMOVE_TAIL(msg->content->buffers);
dx_free_buffer(buf);
}
+ dx_delivery_set_context(delivery, 0);
return (dx_message_t*) msg;
}
@@ -520,14 +572,76 @@ dx_message_t *dx_message_receive(pn_deli
}
-void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+static void send_handler(void *context, const unsigned char *start, int length)
{
- dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
- dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+ pn_link_t *pnl = (pn_link_t*) context;
+ pn_link_send(pnl, (const char*) start, length);
+}
+
+
+void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
+ pn_link_t *pnl = dx_link_pn(link);
+
+ if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
+ //
+ // This is the case where the delivery annotations have been modified.
+ // The message send must be divided into sections: The existing header;
+ // the new delivery annotations; the rest of the existing message.
+ // Note that the original delivery annotations that are still in the
+ // buffer chain must not be sent.
+ //
+ // Start by making sure that we've parsed the message sections through
+ // the delivery annotations
+ //
+ if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS))
+ return;
+
+ //
+ // Send header if present
+ //
+ cursor = dx_buffer_base(buf);
+ if (content->section_message_header.length > 0) {
+ pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3);
+ buf = content->section_message_header.buffer;
+ cursor = content->section_message_header.offset + dx_buffer_base(buf);
+ advance(&cursor, &buf, content->section_message_header.length, send_handler, (void*) pnl);
+ }
+
+ //
+ // Send new delivery annotations
+ //
+ dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (da_buf) {
+ pn_link_send(pnl, (char*) dx_buffer_base(da_buf), dx_buffer_size(da_buf));
+ da_buf = DEQ_NEXT(da_buf);
+ }
+
+ //
+ // Skip over replaced delivery annotations
+ //
+ if (content->section_delivery_annotation.length > 0)
+ advance(&cursor, &buf,
+ content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length,
+ 0, 0);
+
+ //
+ // Send remaining partial buffer
+ //
+ if (buf) {
+ size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf));
+ advance(&cursor, &buf, len, send_handler, (void*) pnl);
+ }
+
+ // Fall through to process the remaining buffers normally
+ }
- // TODO - Handle cases where annotations have been added or modified
while (buf) {
- pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+ pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
@@ -557,29 +671,6 @@ static int dx_check_field_LH(dx_message_
static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t depth)
{
- static const unsigned char * const MSG_HDR_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
- static const unsigned char * const MSG_HDR_SHORT = (unsigned char*) "\x00\x53\x70";
- static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
- static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x71";
- static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
- static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned char*) "\x00\x53\x72";
- static const unsigned char * const PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
- static const unsigned char * const PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x73";
- static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
- static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
- static const unsigned char * const BODY_DATA_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
- static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75";
- static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
- static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76";
- static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
- static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77";
- static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
- static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78";
- static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0";
- static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1";
- static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0";
- static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
-
dx_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer)
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,10 +45,11 @@
*/
typedef struct {
- dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
- size_t offset; // Offset in the buffer to the first octet
- size_t length; // Length of the field or zero if unneeded
- int parsed; // non-zero iff the buffer chain has been parsed to find this field
+ dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ size_t hdr_length; // Length of the field's header (not included in the length of the field)
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
} dx_field_location_t;
@@ -58,12 +59,15 @@ typedef struct {
// 1) Received message is held and forwarded unmodified - single buffer list
// 2) Received message is held and modified before forwarding - two buffer lists
// 3) Message is composed internally - single buffer list
+// TODO - provide a way to allocate a message without a lock for the link-routing case.
+// It's likely that link-routing will cause no contention for the message content.
+//
typedef struct {
sys_mutex_t *lock;
- uint32_t ref_count; // The number of qmessages referencing this
+ uint32_t ref_count; // The number of messages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
@@ -78,12 +82,12 @@ typedef struct {
dx_buffer_t *parse_buffer;
unsigned char *parse_cursor;
dx_message_depth_t parse_depth;
+ dx_parsed_field_t *parsed_delivery_annotations;
} dx_message_content_t;
typedef struct {
- DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t
+ DEQ_LINKS(dx_message_t); // Deque linkage that overlays the dx_message_t
dx_message_content_t *content;
- pn_delivery_t *out_delivery;
} dx_message_pvt_t;
ALLOC_DECLARE(dx_message_t);
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t);
ALLOC_DEFINE(dx_parsed_field_t);
-static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
{
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Tag";
- *tag = dx_field_iterator_octet(iter);
- *count = 0;
- *length = 0;
+ *tag = dx_field_iterator_octet(iter);
+ *count = 0;
+ *length = 0;
+ *clen = 0;
switch (*tag & 0xF0) {
case 0x40: *length = 0; break;
@@ -59,7 +60,7 @@ static char *get_type_info(dx_field_iter
*length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
*length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
// fall through to the next case
-
+
case 0xA0:
case 0xC0:
case 0xE0:
@@ -78,19 +79,24 @@ static char *get_type_info(dx_field_iter
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+ *clen = 3;
// fall through to the next case
-
+
case 0xC0:
case 0xE0:
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Count";
*count += (unsigned int) dx_field_iterator_octet(iter);
+ *clen += 1;
break;
}
if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1))
return "Odd Number of Elements in a Map";
+ if (*clen > *length)
+ return "Insufficient Length to Determine Count";
+
return 0;
}
@@ -108,13 +114,13 @@ static dx_parsed_field_t *dx_parse_inter
uint32_t length;
uint32_t count;
+ uint32_t length_of_count;
- field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+ field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
if (!field->parse_error) {
field->raw_iter = dx_field_iterator_sub(iter, length);
- if (count == 0 && length > 0)
- dx_field_iterator_advance(iter, length);
+ dx_field_iterator_advance(iter, length - length_of_count);
for (uint32_t idx = 0; idx < count; idx++) {
dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
DEQ_INSERT_TAIL(field->children, child);
@@ -377,4 +383,3 @@ dx_parsed_field_t *dx_parse_value_by_key
return 0;
}
-
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -103,7 +103,7 @@ void sys_cond_signal_all(sys_cond_t *con
struct sys_thread_t {
pthread_t thread;
-};
+};
sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
{
@@ -123,4 +123,3 @@ void sys_thread_join(sys_thread_t *threa
{
pthread_join(thread->thread, 0);
}
-
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
typedef struct {
PyObject_HEAD
PyObject *handler;
+ PyObject *handler_rx_call;
dx_dispatch_t *dx;
dx_address_t *address;
} IoAdapter;
@@ -403,9 +404,65 @@ typedef struct {
static void dx_io_rx_handler(void *context, dx_message_t *msg)
{
- //IoAdapter *self = (IoAdapter*) context;
+ IoAdapter *self = (IoAdapter*) context;
- // TODO - Parse the incoming message and send it to the python handler.
+ //
+ // Parse the message through the body and exit if the message is not well formed.
+ //
+ if (!dx_message_check(msg, DX_DEPTH_BODY))
+ return;
+
+ //
+ // Get an iterator for the application-properties. Exit if the message has none.
+ //
+ dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+ if (ap == 0)
+ return;
+
+ //
+ // Try to get a map-view of the application-properties.
+ //
+ dx_parsed_field_t *ap_map = dx_parse(ap);
+ if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Get an iterator for the body. Exit if the message has none.
+ //
+ dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+ if (body == 0) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Try to get a map-view of the body.
+ //
+ dx_parsed_field_t *body_map = dx_parse(body);
+ if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+ dx_field_iterator_free(ap);
+ dx_field_iterator_free(body);
+ dx_parse_free(ap_map);
+ dx_parse_free(body_map);
+ return;
+ }
+
+ PyObject *pAP = dx_field_to_py(ap_map);
+ PyObject *pBody = dx_field_to_py(body_map);
+
+ PyObject *pArgs = PyTuple_New(2);
+ PyTuple_SetItem(pArgs, 0, pAP);
+ PyTuple_SetItem(pArgs, 1, pBody);
+
+ PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
@@ -415,9 +472,14 @@ static int IoAdapter_init(IoAdapter *sel
if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
return -1;
+ self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+ if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+ return -1;
+
Py_INCREF(self->handler);
+ Py_INCREF(self->handler_rx_call);
self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+ self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
return 0;
}
@@ -426,24 +488,35 @@ static void IoAdapter_dealloc(IoAdapter*
{
dx_router_unregister_address(self->address);
Py_DECREF(self->handler);
+ Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
}
static PyObject* dx_python_send(PyObject *self, PyObject *args)
{
- IoAdapter *ioa = (IoAdapter*) self;
- const char *address;
- PyObject *app_properties;
- PyObject *body;
+ IoAdapter *ioa = (IoAdapter*) self;
+ dx_composed_field_t *field = 0;
+ const char *address;
+ PyObject *app_properties;
+ PyObject *body;
+
if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
return 0;
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+ field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+ dx_compose_start_map(field);
+
+ dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+ dx_compose_insert_string(field, "qdx.trace");
dx_compose_start_list(field);
- dx_compose_insert_bool(field, 0); // durable
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
+ dx_compose_end_map(field);
+
field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
dx_compose_start_list(field);
dx_compose_insert_null(field); // message-id
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
+#include <stdbool.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
@@ -28,123 +29,290 @@ static char *module = "ROUTER";
static void dx_router_python_setup(dx_router_t *router);
static void dx_pyrouter_tick(dx_router_t *router);
-//static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *router_address = "_local/qdxrouter";
+static char *local_prefix = "_local/";
+//static char *topo_prefix = "_topo/";
/**
* Address Types and Processing:
*
- * Address Hash Key onReceive onEmit
- * =============================================================================
- * _local/<local> L<local> handler forward
- * _topo/<area>/<router>/<local> A<area> forward forward
- * _topo/<my-area>/<router>/<local> R<router> forward forward
- * _topo/<my-area>/<my-router>/<local> L<local> forward+handler forward
- * _topo/<area>/all/<local> A<area> forward forward
- * _topo/<my-area>/all/<local> L<local> forward+handler forward
- * _topo/all/all/<local> L<local> forward+handler forward
- * <mobile> M<mobile> forward+handler forward
+ * Address Hash Key onReceive
+ * ===================================================================
+ * _local/<local> L<local> handler
+ * _topo/<area>/<router>/<local> A<area> forward
+ * _topo/<my-area>/<router>/<local> R<router> forward
+ * _topo/<my-area>/<my-router>/<local> L<local> handler
+ * _topo/<area>/all/<local> A<area> forward
+ * _topo/<my-area>/all/<local> L<local> forward handler
+ * _topo/all/all/<local> L<local> forward handler
+ * <mobile> M<mobile> forward handler
*/
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_link_list_t in_links;
- dx_link_list_t out_links;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
-typedef struct {
- dx_link_t *link;
- dx_message_list_t out_fifo;
-} dx_router_link_t;
+
+typedef enum {
+ DX_LINK_ENDPOINT, // A link to a connected endpoint
+ DX_LINK_ROUTER, // A link to a peer router in the same area
+ DX_LINK_AREA // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+ DEQ_LINKS(struct dx_routed_event_t);
+ dx_delivery_t *delivery;
+ dx_message_t *message;
+ bool settle;
+ uint64_t disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+ALLOC_DEFINE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+ DEQ_LINKS(dx_router_link_t);
+ dx_direction_t link_direction;
+ dx_link_type_t link_type;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
+ dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
+};
ALLOC_DECLARE(dx_router_link_t);
ALLOC_DEFINE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
-
-typedef struct {
+struct dx_router_node_t {
+ DEQ_LINKS(dx_router_node_t);
const char *id;
- dx_router_link_t *next_hop;
+ dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
+ dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
// list of valid origins (pointers to router_node) - (bit masks?)
-} dx_router_node_t;
+};
ALLOC_DECLARE(dx_router_node_t);
ALLOC_DEFINE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
struct dx_address_t {
- int is_local;
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list
- dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context;
+ dx_router_link_list_t rlinks; // Locally-Connected Consumers
+ dx_router_node_list_t rnodes; // Remotely-Connected Consumers
};
ALLOC_DECLARE(dx_address_t);
ALLOC_DEFINE(dx_address_t);
+struct dx_router_t {
+ dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
+ dx_node_t *node;
+ dx_router_link_list_t in_links;
+ dx_router_node_list_t routers;
+ dx_message_list_t in_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ hash_t *out_hash;
+ uint64_t dtag;
+ PyObject *pyRouter;
+ PyObject *pyTick;
+};
+
+
/**
- * Outbound Delivery Handler
+ * Outgoing Link Writable Handler
*/
-static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static int router_writable_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- dx_message_t *msg;
- size_t size;
+ dx_router_t *router = (dx_router_t*) context;
+ dx_delivery_t *delivery;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ pn_link_t *pn_link = dx_link_pn(link);
+ uint64_t tag;
+ int link_credit = pn_link_credit(pn_link);
+ dx_routed_event_list_t to_send;
+ dx_routed_event_list_t events;
+ dx_routed_event_t *re;
+ size_t offer;
+ int event_count = 0;
+
+ DEQ_INIT(to_send);
+ DEQ_INIT(events);
sys_mutex_lock(router->lock);
- msg = DEQ_HEAD(rlink->out_fifo);
- if (!msg) {
- // TODO - Recind the delivery
- sys_mutex_unlock(router->lock);
- return;
+
+ //
+ // Pull the non-delivery events into a local list so they can be processed without
+ // the lock being held.
+ //
+ re = DEQ_HEAD(rlink->event_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->event_fifo);
+ DEQ_INSERT_TAIL(events, re);
+ re = DEQ_HEAD(rlink->event_fifo);
+ }
+
+ //
+ // Under lock, move available deliveries from the msg_fifo to the local to_send
+ // list. Don't move more than we have credit to send.
+ //
+ if (link_credit > 0) {
+ tag = router->dtag;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ while (re) {
+ DEQ_REMOVE_HEAD(rlink->msg_fifo);
+ DEQ_INSERT_TAIL(to_send, re);
+ if (DEQ_SIZE(to_send) == link_credit)
+ break;
+ re = DEQ_HEAD(rlink->msg_fifo);
+ }
+ router->dtag += DEQ_SIZE(to_send);
}
- DEQ_REMOVE_HEAD(rlink->out_fifo);
- size = (DEQ_SIZE(rlink->out_fifo));
+ offer = DEQ_SIZE(rlink->msg_fifo);
sys_mutex_unlock(router->lock);
- dx_message_send(msg, pn_link);
+ //
+ // Deliver all the to_send messages downrange
+ //
+ re = DEQ_HEAD(to_send);
+ while (re) {
+ DEQ_REMOVE_HEAD(to_send);
+
+ //
+ // Get a delivery for the send. This will be the current deliver on the link.
+ //
+ tag++;
+ delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
+
+ //
+ // Send the message
+ //
+ dx_message_send(re->message, link);
+
+ //
+ // If there is an incoming delivery associated with this message, link it
+ // with the outgoing delivery. Otherwise, the message arrived pre-settled
+ // and should be sent presettled.
+ //
+ if (re->delivery) {
+ dx_delivery_set_peer(re->delivery, delivery);
+ dx_delivery_set_peer(delivery, re->delivery);
+ } else
+ dx_delivery_free(delivery, 0); // settle and free
+
+ pn_link_advance(pn_link);
+ event_count++;
+
+ dx_free_message(re->message);
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(to_send);
+ }
//
- // If there is no incoming delivery, it was pre-settled. In this case,
- // we must pre-settle the outgoing delivery as well.
+ // Process the non-delivery events.
//
- if (dx_message_in_delivery(msg)) {
- pn_delivery_set_context(delivery, (void*) msg);
- dx_message_set_out_delivery(msg, delivery);
- } else {
- pn_delivery_settle(delivery);
- dx_free_message(msg);
+ re = DEQ_HEAD(events);
+ while (re) {
+ DEQ_REMOVE_HEAD(events);
+
+ if (re->delivery) {
+ if (re->disposition) {
+ pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition);
+ event_count++;
+ }
+ if (re->settle) {
+ dx_delivery_free(re->delivery, 0);
+ event_count++;
+ }
+ }
+
+ free_dx_routed_event_t(re);
+ re = DEQ_HEAD(events);
}
- pn_link_advance(pn_link);
- pn_link_offered(pn_link, size);
+ //
+ // Set the offer to the number of messages remaining to be sent.
+ //
+ pn_link_offered(pn_link, offer);
+ return event_count;
+}
+
+
+static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+{
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+
+ dx_parsed_field_t *trace = 0;
+ dx_parsed_field_t *ingress = 0;
+
+ if (in_da) {
+ trace = dx_parse_value_by_key(in_da, "qdx.trace");
+ ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ }
+
+ dx_compose_start_map(out_da);
+
+ //
+ // If there is a trace field, append this router's ID to the trace.
+ //
+ if (trace && dx_parse_is_list(trace)) {
+ dx_compose_insert_string(out_da, "qdx.trace");
+ dx_compose_start_list(out_da);
+
+ uint32_t idx = 0;
+ dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+ while (trace_item) {
+ dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ dx_compose_insert_string_iterator(out_da, iter);
+ idx++;
+ trace_item = dx_parse_sub_value(trace, idx);
+ }
+
+ dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_end_list(out_da);
+ }
+
+ //
+ // If there is no ingress field, annotate the ingress as this router else
+ // keep the original field.
+ //
+ dx_compose_insert_string(out_da, "qdx.ingress");
+ if (ingress && dx_parse_is_scalar(ingress)) {
+ dx_field_iterator_t *iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, iter);
+ } else
+ dx_compose_insert_string(out_da, router->router_id);
+
+ dx_compose_end_map(out_da);
+
+ dx_message_set_delivery_annotations(msg, out_da);
+ dx_compose_free(out_da);
}
/**
* Inbound Delivery Handler
*/
-static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_message_t *msg;
- int valid_message = 0;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_message_t *msg;
+ int valid_message = 0;
//
// Receive the message into a local representation. If the returned message
@@ -158,20 +326,63 @@ static void router_rx_handler(void* cont
return;
//
- // Validate the message through the Properties section
+ // Consume the delivery and issue a replacement credit
//
- valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
-
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
+ sys_mutex_lock(router->lock);
+
+ //
+ // Handle the Link-Routing case. If this incoming link is associated with a connected
+ // link, simply deliver the message to the outgoing link. There is no need to validate
+ // the message in this case.
+ //
+ if (rlink->connected_link) {
+ dx_router_link_t *clink = rlink->connected_link;
+ dx_routed_event_t *re = new_dx_routed_event_t();
+
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = msg;
+ re->settle = false;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(clink->msg_fifo, re);
+
+ //
+ // If the incoming delivery is settled (pre-settled), don't link it into the routed
+ // event. If it's not settled, link it into the event for later handling.
+ //
+ if (dx_delivery_settled(delivery))
+ dx_delivery_free(delivery, 0);
+ else
+ re->delivery = delivery;
+
+ sys_mutex_unlock(router->lock);
+ dx_link_activate(clink->link);
+ return;
+ }
+
+ //
+ // We are performing Message-Routing, therefore we will need to validate the message
+ // through the Properties section so we can access the TO field.
+ //
+ dx_message_t *in_process_copy = 0;
+ dx_router_message_cb handler = 0;
+ void *handler_context = 0;
+
+ valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
dx_address_t *addr;
+ int fanout = 0;
+
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void*) &addr);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -179,108 +390,145 @@ static void router_rx_handler(void* cont
// To field is valid and contains a known destination. Handle the various
// cases for forwarding.
//
- // Forward to the in-process handler for this message if there is one.
- // Note: If the handler is going to queue the message for deferred processing,
- // it must copy the message. This function assumes that the handler
- // will process the message synchronously and be finished with it upon
- // completion.
- //
- if (addr->handler)
- addr->handler(addr->handler_context, msg);
//
- // Forward to the local link for the locally-connected consumer, if present.
- // TODO - Don't forward if this is a "_local" address.
+ // Interpret and update the delivery annotations of the message
//
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
- }
+ router_annotate_message(router, msg);
//
- // Forward to the next-hop for a remotely-connected consumer, if present.
- // Don't forward if this is a "_local" address.
+ // Forward to the in-process handler for this message if there is one. The
+ // actual invocation of the handler will occur later after we've released
+ // the lock.
//
- if (addr->rnode) {
- // TODO
+ if (addr->handler) {
+ in_process_copy = dx_message_copy(msg);
+ handler = addr->handler;
+ handler_context = addr->handler_context;
}
- } else {
//
- // To field contains an unknown address. Release the message.
+ // If the address form is local (i.e. is prefixed by _local), don't forward
+ // outside of the router process.
//
- pn_delivery_update(delivery, PN_RELEASED);
- pn_delivery_settle(delivery);
+ if (!is_local) {
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1 && !dx_delivery_settled(delivery))
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
+ }
+ }
}
- sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
- dx_free_message(msg);
-
//
- // If this was a pre-settled delivery, we must also locally settle it.
+ // In message-routing mode, the handling of the incoming delivery depends on the
+ // number of copies of the received message that were forwarded.
//
- if (pn_delivery_settled(delivery))
- pn_delivery_settle(delivery);
+ if (handler) {
+ dx_delivery_free(delivery, PN_ACCEPTED);
+ } else if (fanout == 0) {
+ dx_delivery_free(delivery, PN_RELEASED);
+ } else if (fanout > 1)
+ dx_delivery_free(delivery, PN_ACCEPTED);
}
} else {
//
// Message is invalid. Reject the message.
//
- pn_delivery_update(delivery, PN_REJECTED);
- pn_delivery_settle(delivery);
- pn_delivery_set_context(delivery, 0);
- dx_free_message(msg);
+ dx_delivery_free(delivery, PN_REJECTED);
}
+
+ sys_mutex_unlock(router->lock);
+ dx_free_message(msg);
+
+ //
+ // Invoke the in-process handler now that the lock is released.
+ //
+ if (handler)
+ handler(handler_context, in_process_copy);
}
/**
* Delivery Disposition Handler
*/
-static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
-
- if (pn_link_is_sender(pn_link)) {
- uint64_t disp = pn_delivery_remote_state(delivery);
- dx_message_t *msg = pn_delivery_get_context(delivery);
- pn_delivery_t *activate = 0;
-
- if (msg) {
- assert(delivery == dx_message_out_delivery(msg));
- if (disp != 0) {
- activate = dx_message_in_delivery(msg);
- pn_delivery_update(activate, disp);
- // TODO - handling of the data accompanying RECEIVED/MODIFIED
- }
+ dx_router_t *router = (dx_router_t*) context;
+ bool changed = dx_delivery_disp_changed(delivery);
+ uint64_t disp = dx_delivery_disp(delivery);
+ bool settled = dx_delivery_settled(delivery);
+ dx_delivery_t *peer = dx_delivery_peer(delivery);
- if (pn_delivery_settled(delivery)) {
- //
- // Downstream delivery has been settled. Propagate the settlement
- // upstream.
- //
- activate = dx_message_in_delivery(msg);
- pn_delivery_settle(activate);
- pn_delivery_settle(delivery);
- dx_free_message(msg);
- }
+ if (peer) {
+ //
+ // The case where this delivery has a peer.
+ //
+ if (changed || settled) {
+ dx_link_t *peer_link = dx_delivery_link(peer);
+ dx_router_link_t *prl = (dx_router_link_t*) dx_link_get_context(peer_link);
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = peer;
+ re->message = 0;
+ re->settle = settled;
+ re->disposition = changed ? disp : 0;
- if (activate) {
- //
- // Activate the upstream/incoming link so that the settlement will
- // get pushed out.
- //
- dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
- dx_link_activate(act_link);
- }
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(prl->event_fifo, re);
+ sys_mutex_unlock(router->lock);
- return;
+ dx_link_activate(peer_link);
}
+
} else {
- // TODO - Handle disposition updates from upstream
+ //
+ // The no-peer case. Ignore status changes and echo settlement.
+ //
+ if (settled)
+ dx_delivery_free(delivery, 0);
}
}
@@ -290,25 +538,36 @@ static void router_disp_handler(void* co
*/
static int router_incoming_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- dx_link_item_t *item = new_dx_link_item_t();
- pn_link_t *pn_link = dx_link_pn(link);
-
- if (item) {
- DEQ_ITEM_INIT(item);
- item->link = link;
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = new_dx_router_link_t();
+ pn_link_t *pn_link = dx_link_pn(link);
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, item);
- sys_mutex_unlock(router->lock);
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ENDPOINT;
+ rlink->owning_addr = 0;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ dx_link_set_context(link, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_flow(pn_link, 1000);
+ pn_link_open(pn_link);
+
+ //
+ // TODO - If the address has link-route semantics, create all associated
+ // links needed to go with this one.
+ //
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
- } else {
- pn_link_close(pn_link);
- }
return 0;
}
@@ -327,73 +586,45 @@ static int router_outgoing_link_handler(
return 0;
}
- dx_router_link_t *rlink = new_dx_router_link_t();
- rlink->link = link;
- DEQ_INIT(rlink->out_fifo);
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ dx_router_link_t *rlink = new_dx_router_link_t();
+
+ int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
dx_link_set_context(link, rlink);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
dx_address_t *addr;
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
- addr->is_local = 0;
addr->handler = 0;
addr->handler_context = 0;
- addr->rlink = 0;
- addr->rnode = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
hash_insert(router->out_hash, iter, addr);
}
dx_field_iterator_free(iter);
- if (addr->rlink == 0) {
- addr->rlink = rlink;
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
- return 0;
- }
-
- dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
- pn_link_close(pn_link);
- sys_mutex_unlock(router->lock);
- return 0;
-}
-
-
-/**
- * Outgoing Link Writable Handler
- */
-static int router_writable_link_handler(void* context, dx_link_t *link)
-{
- dx_router_t *router = (dx_router_t*) context;
- int grant_delivery = 0;
- pn_delivery_t *delivery;
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- pn_link_t *pn_link = dx_link_pn(link);
- uint64_t tag;
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
- sys_mutex_lock(router->lock);
- if (DEQ_SIZE(rlink->out_fifo) > 0) {
- grant_delivery = 1;
- tag = router->dtag++;
- }
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_open(pn_link);
sys_mutex_unlock(router->lock);
-
- if (grant_delivery) {
- pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
- delivery = pn_link_current(pn_link);
- if (delivery) {
- router_tx_handler(context, link, delivery);
- return 1;
- }
- }
-
+ dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -403,44 +634,37 @@ static int router_writable_link_handler(
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- dx_link_item_t *item;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- if (!r_tgt)
+ if (!rlink)
return 0;
sys_mutex_lock(router->lock);
if (pn_link_is_sender(pn_link)) {
- item = DEQ_HEAD(router->out_links);
+ DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(addr->rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ if ((rlink->owning_addr->handler == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+ if (iter) {
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (addr == rlink->owning_addr) {
+ hash_remove(router->out_hash, iter);
+ free_dx_router_link_t(rlink);
+ free_dx_address_t(addr);
+ dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ }
+ dx_field_iterator_free(iter);
}
- dx_field_iterator_free(iter);
- }
- }
- else
- item = DEQ_HEAD(router->in_links);
-
- while (item) {
- if (item->link == link) {
- if (pn_link_is_sender(pn_link))
- DEQ_REMOVE(router->out_links, item);
- else
- DEQ_REMOVE(router->in_links, item);
- free_dx_link_item_t(item);
- break;
}
- item = item->next;
+ } else {
+ DEQ_REMOVE(router->in_links, rlink);
+ free_dx_router_link_t(rlink);
}
sys_mutex_unlock(router->lock);
@@ -455,6 +679,83 @@ static void router_inbound_open_handler(
static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
{
+ // TODO - Make sure this connection is annotated as an inter-router transport.
+ // Ignore otherwise
+
+ dx_router_t *router = (dx_router_t*) type_context;
+ dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
+ dx_link_t *sender;
+ dx_link_t *receiver;
+ dx_router_link_t *rlink;
+
+ //
+ // Create an incoming link and put it in the in-links collection. The address
+ // of the remote source of this link is '_local/qdxrouter'.
+ //
+ receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
+ pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
+ pn_terminus_set_address(dx_link_target(receiver), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->owning_addr = 0;
+ rlink->link = receiver;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ dx_link_set_context(receiver, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ //
+ // Create an outgoing link with a local source of '_local/qdxrouter' and place
+ // it in the routing table.
+ //
+ sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
+ pn_terminus_set_address(dx_link_remote_target(sender), router_address);
+ pn_terminus_set_address(dx_link_source(sender), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->link = sender;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ dx_link_set_context(sender, rlink);
+
+ dx_address_t *addr;
+
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, aiter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, aiter, addr);
+ }
+
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_link_open(dx_link_pn(receiver));
+ pn_link_open(dx_link_pn(sender));
+ pn_link_flow(dx_link_pn(receiver), 1000);
+ dx_field_iterator_free(aiter);
}
@@ -473,7 +774,6 @@ static void dx_router_timer_handler(void
static dx_node_type_t router_node = {"router", 0, 0,
router_rx_handler,
- router_tx_handler,
router_disp_handler,
router_incoming_link_handler,
router_outgoing_link_handler,
@@ -494,22 +794,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx
}
dx_router_t *router = NEW(dx_router_t);
- dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
+ router_node.type_context = router;
+
+ router->dx = dx;
+ router->router_area = area;
+ router->router_id = id;
+ router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
DEQ_INIT(router->in_links);
- DEQ_INIT(router->out_links);
+ DEQ_INIT(router->routers);
DEQ_INIT(router->in_fifo);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
- router->dx = dx;
- router->lock = sys_mutex();
- router->router_area = area;
- router->router_id = id;
-
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
-
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -546,47 +847,52 @@ void dx_router_free(dx_router_t *router)
}
+const char *dx_router_id(const dx_dispatch_t *dx)
+{
+ dx_router_t *router = dx->router;
+ return router->router_id;
+}
+
+
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
- bool is_local,
const char *address,
dx_router_message_cb handler,
void *context)
{
- char addr[1000];
- dx_address_t *ad = new_dx_address_t();
+ char addr_string[1000];
+ dx_router_t *router = dx->router;
+ dx_address_t *addr;
dx_field_iterator_t *iter;
- int result;
- if (!ad)
- return 0;
+ strcpy(addr_string, "L"); // Local Hash-Key Space
+ strcat(addr_string, address);
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
- ad->is_local = is_local;
- ad->handler = handler;
- ad->handler_context = context;
- ad->rlink = 0;
-
- if (ad->is_local)
- strcpy(addr, "L"); // Local Hash-Key Space
- else
- strcpy(addr, "M"); // Mobile Hash-Key Space
-
- strcat(addr, address);
- iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
- result = hash_insert(dx->router->out_hash, iter, ad);
- dx_field_iterator_free(iter);
- if (result != 0) {
- free_dx_address_t(ad);
- return 0;
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, iter, addr);
}
+ dx_field_iterator_free(iter);
+
+ addr->handler = handler;
+ addr->handler_context = context;
+
+ sys_mutex_unlock(router->lock);
dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
- return ad;
+ return addr;
}
void dx_router_unregister_address(dx_address_t *ad)
{
- free_dx_address_t(ad);
+ //free_dx_address_t(ad);
}
@@ -601,12 +907,43 @@ void dx_router_send(dx_dispatch_t
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, address, (void*) &addr);
if (addr) {
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
}
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -633,6 +970,24 @@ typedef struct {
} RouterAdapter;
+static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+{
+ //RouterAdapter *adapter = (RouterAdapter*) self;
+ //dx_router_t *router = adapter->router;
+ const char *address;
+ int is_reachable;
+ int is_neighbor;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+ return 0;
+
+ // TODO
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
@@ -666,8 +1021,9 @@ static PyObject* dx_router_del_route(PyO
static PyMethodDef RouterAdapter_methods[] = {
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
+ {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
+ {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
{0, 0, 0, 0}
};
@@ -747,7 +1103,7 @@ static void dx_router_python_setup(dx_ro
PyObject* pClass;
PyObject* pArgs;
- pName = PyString_FromString("router");
+ pName = PyString_FromString("qpid.dispatch.router");
pModule = PyImport_Import(pName);
Py_DECREF(pName);
if (!pModule) {
@@ -809,14 +1165,16 @@ static void dx_pyrouter_tick(dx_router_t
PyObject *pArgs;
PyObject *pValue;
- pArgs = PyTuple_New(0);
- pValue = PyObject_CallObject(router->pyTick, pArgs);
- if (PyErr_Occurred()) {
- PyErr_Print();
- }
- Py_DECREF(pArgs);
- if (pValue) {
- Py_DECREF(pValue);
+ if (router->pyTick) {
+ pArgs = PyTuple_New(0);
+ pValue = PyObject_CallObject(router->pyTick, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
}
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/server.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/server.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/server.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *
}
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
{
+ pn_driver_t *driver = dx_server->driver;
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
dx_connection_t *ctx;
while (listener) {
- dx_log(module, LOG_TRACE, "Accepting Connection");
cxtr = pn_listener_accept(listener);
+ dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
ctx = new_dx_connection_t();
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
- ctx->pn_conn = 0;
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->ufd = 0;
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, dx_server->container_name);
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
//
// Get a pointer to the transport so we can insert security components into it
//
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
- case CONN_STATE_CONNECTING:
- if (!pn_connector_closed(cxtr)) {
- //ctx->state = CONN_STATE_SASL_CLIENT;
- assert(ctx->connector);
- ctx->connector->state = CXTR_STATE_OPEN;
- events = 1;
- } else {
+ case CONN_STATE_CONNECTING: {
+ if (pn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_FAILED;
events = 0;
+ break;
}
- break;
-
- case CONN_STATE_OPENING:
- ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ const dx_server_config_t *config = ctx->connector->config;
- if (ctx->listener) {
- ce = DX_CONN_EVENT_LISTENER_OPEN;
- } else if (ctx->connector) {
- ce = DX_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
- } else
- assert(0);
+ //
+ // Set up SSL if appropriate
+ //
+ if (config->ssl_enabled) {
+ pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+ pn_ssl_domain_set_credentials(domain,
+ config->ssl_certificate_file,
+ config->ssl_private_key_file,
+ config->ssl_password);
+
+ if (config->ssl_require_peer_authentication)
+ pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+ pn_ssl_t *ssl = pn_ssl(tport);
+ pn_ssl_init(ssl, domain, 0);
+ pn_ssl_domain_free(domain);
+ }
- dx_server->conn_handler(dx_server->conn_handler_context,
- ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ //
+ // Set up SASL
+ //
+ pn_sasl_t *sasl = pn_sasl(tport);
+ pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+ pn_sasl_client(sasl);
+
+ ctx->state = CONN_STATE_OPENING;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
break;
+ }
+
+ case CONN_STATE_OPENING: {
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ pn_sasl_t *sasl = pn_sasl(tport);
+
+ if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(dx_server->conn_handler_context,
+ ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+ }
+ else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+ ctx->state = CONN_STATE_FAILED;
+ if (ctx->connector) {
+ const dx_server_config_t *config = ctx->connector->config;
+ dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ }
+ }
+ }
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
@@ -323,20 +372,35 @@ static void *thread_run(void *arg)
//
// Service pending timers.
//
- dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
- if (timer) {
- DEQ_REMOVE_HEAD(dx_server->pending_timers);
-
- //
- // Mark the timer as idle in case it reschedules itself.
- //
- dx_timer_idle_LH(timer);
+ if (DEQ_SIZE(dx_server->pending_timers) > 0) {
+ dx_timer_list_t local_list;
+ dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+
+ DEQ_INIT(local_list);
+ while (timer) {
+ DEQ_REMOVE_HEAD(dx_server->pending_timers);
+ DEQ_INSERT_TAIL(local_list, timer);
+ timer = DEQ_HEAD(dx_server->pending_timers);
+ }
//
- // Release the lock and invoke the connection handler.
+ // Release the lock and invoke the connection handlers.
//
sys_mutex_unlock(dx_server->lock);
- timer->handler(timer->context);
+
+ timer = DEQ_HEAD(local_list);
+ while (timer) {
+ DEQ_REMOVE_HEAD(local_list);
+
+ //
+ // Mark the timer as idle in case it reschedules itself.
+ //
+ dx_timer_idle_LH(timer);
+
+ timer->handler(timer->context);
+ timer = DEQ_HEAD(local_list);
+ }
+
pn_driver_wakeup(dx_server->driver);
continue;
}
@@ -411,7 +475,7 @@ static void *thread_run(void *arg)
//
// Process listeners (incoming connections).
//
- thread_process_listeners(dx_server->driver);
+ thread_process_listeners(dx_server);
//
// Traverse the list of connectors-needing-service from the proton driver.
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org