You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC

svn commit: r1525101 [10/21] - in /qpid/branches/linearstore/qpid: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/log.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/log.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/log.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/log.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/log_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,11 +25,35 @@
 #include <string.h>
 #include <stdio.h>
 
+static const unsigned char * const MSG_HDR_LONG                 = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
+static const unsigned char * const MSG_HDR_SHORT                = (unsigned char*) "\x00\x53\x70";
+static const unsigned char * const DELIVERY_ANNOTATION_LONG     = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
+static const unsigned char * const DELIVERY_ANNOTATION_SHORT    = (unsigned char*) "\x00\x53\x71";
+static const unsigned char * const MESSAGE_ANNOTATION_LONG      = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
+static const unsigned char * const MESSAGE_ANNOTATION_SHORT     = (unsigned char*) "\x00\x53\x72";
+static const unsigned char * const PROPERTIES_LONG              = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
+static const unsigned char * const PROPERTIES_SHORT             = (unsigned char*) "\x00\x53\x73";
+static const unsigned char * const APPLICATION_PROPERTIES_LONG  = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
+static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
+static const unsigned char * const BODY_DATA_LONG               = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
+static const unsigned char * const BODY_DATA_SHORT              = (unsigned char*) "\x00\x53\x75";
+static const unsigned char * const BODY_SEQUENCE_LONG           = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
+static const unsigned char * const BODY_SEQUENCE_SHORT          = (unsigned char*) "\x00\x53\x76";
+static const unsigned char * const BODY_VALUE_LONG              = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
+static const unsigned char * const BODY_VALUE_SHORT             = (unsigned char*) "\x00\x53\x77";
+static const unsigned char * const FOOTER_LONG                  = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
+static const unsigned char * const FOOTER_SHORT                 = (unsigned char*) "\x00\x53\x78";
+static const unsigned char * const TAGS_LIST                    = (unsigned char*) "\x45\xc0\xd0";
+static const unsigned char * const TAGS_MAP                     = (unsigned char*) "\xc1\xd1";
+static const unsigned char * const TAGS_BINARY                  = (unsigned char*) "\xa0\xb0";
+static const unsigned char * const TAGS_ANY                     = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
+
 ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
 ALLOC_DEFINE(dx_message_content_t);
 
+typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
 
-static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
 {
     unsigned char *local_cursor = *cursor;
     dx_buffer_t   *local_buffer = *buffer;
@@ -37,9 +61,13 @@ static void advance(unsigned char **curs
     int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer));
     while (consume > 0) {
         if (consume < remaining) {
+            if (handler)
+                handler(context, local_cursor, consume);
             local_cursor += consume;
             consume = 0;
         } else {
+            if (handler)
+                handler(context, local_cursor, remaining);
             consume -= remaining;
             local_buffer = local_buffer->next;
             if (local_buffer == 0){
@@ -59,7 +87,7 @@ static void advance(unsigned char **curs
 static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
 {
     unsigned char result = **cursor;
-    advance(cursor, buffer, 1);
+    advance(cursor, buffer, 1, 0, 0);
     return result;
 }
 
@@ -68,7 +96,10 @@ static int traverse_field(unsigned char 
 {
     unsigned char tag = next_octet(cursor, buffer);
     if (!(*cursor)) return 0;
-    int consume = 0;
+
+    int    consume    = 0;
+    size_t hdr_length = 1;
+
     switch (tag & 0xF0) {
     case 0x40 : consume = 0;  break;
     case 0x50 : consume = 1;  break;
@@ -80,6 +111,7 @@ static int traverse_field(unsigned char 
     case 0xB0 :
     case 0xD0 :
     case 0xF0 :
+        hdr_length += 3;
         consume |= ((int) next_octet(cursor, buffer)) << 24;
         if (!(*cursor)) return 0;
         consume |= ((int) next_octet(cursor, buffer)) << 16;
@@ -91,19 +123,21 @@ static int traverse_field(unsigned char 
     case 0xA0 :
     case 0xC0 :
     case 0xE0 :
+        hdr_length++;
         consume |= (int) next_octet(cursor, buffer);
         if (!(*cursor)) return 0;
         break;
     }
 
     if (field && !field->parsed) {
-        field->buffer = *buffer;
-        field->offset = *cursor - dx_buffer_base(*buffer);
-        field->length = consume;
-        field->parsed = 1;
+        field->buffer     = *buffer;
+        field->offset     = *cursor - dx_buffer_base(*buffer);
+        field->length     = consume;
+        field->hdr_length = hdr_length;
+        field->parsed     = 1;
     }
 
-    advance(cursor, buffer, consume);
+    advance(cursor, buffer, consume, 0, 0);
     return 1;
 }
 
@@ -210,10 +244,11 @@ static int dx_check_and_advance(dx_buffe
     //
     // Pattern matched and tag is expected.  Mark the beginning of the section.
     //
-    location->parsed = 1;
-    location->buffer = test_buffer;
-    location->offset = test_cursor - dx_buffer_base(test_buffer);
-    location->length = 0;
+    location->parsed     = 1;
+    location->buffer     = test_buffer;
+    location->offset     = test_cursor - dx_buffer_base(test_buffer);
+    location->length     = 0;
+    location->hdr_length = pattern_length;
 
     //
     // Advance the pointers to consume the whole section.
@@ -249,7 +284,7 @@ static int dx_check_and_advance(dx_buffe
 
     location->length = pre_consume + consume;
     if (consume)
-        advance(&test_cursor, &test_buffer, consume);
+        advance(&test_cursor, &test_buffer, consume, 0, 0);
 
     *cursor = test_cursor;
     *buffer = test_buffer;
@@ -318,6 +353,11 @@ static dx_field_location_t *dx_message_f
         }
         break;
 
+    case DX_FIELD_DELIVERY_ANNOTATION:
+        if (content->section_delivery_annotation.parsed)
+            return &content->section_delivery_annotation;
+        break;
+
     case DX_FIELD_APPLICATION_PROPERTIES:
         if (content->section_application_properties.parsed)
             return &content->section_application_properties;
@@ -343,8 +383,7 @@ dx_message_t *dx_allocate_message()
         return 0;
 
     DEQ_ITEM_INIT(msg);
-    msg->content      = new_dx_message_content_t();
-    msg->out_delivery = 0;
+    msg->content = new_dx_message_content_t();
 
     if (msg->content == 0) {
         free_dx_message_t((dx_message_t*) msg);
@@ -355,6 +394,7 @@ dx_message_t *dx_allocate_message()
     msg->content->lock        = sys_mutex();
     msg->content->ref_count   = 1;
     msg->content->parse_depth = DX_DEPTH_NONE;
+    msg->content->parsed_delivery_annotations = 0;
 
     return (dx_message_t*) msg;
 }
@@ -371,14 +411,23 @@ void dx_free_message(dx_message_t *in_ms
     sys_mutex_unlock(content->lock);
 
     if (rc == 0) {
-        dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+        if (content->parsed_delivery_annotations)
+            dx_parse_free(content->parsed_delivery_annotations);
 
+        dx_buffer_t *buf = DEQ_HEAD(content->buffers);
         while (buf) {
             DEQ_REMOVE_HEAD(content->buffers);
             dx_free_buffer(buf);
             buf = DEQ_HEAD(content->buffers);
         }
 
+        buf = DEQ_HEAD(content->new_delivery_annotations);
+        while (buf) {
+            DEQ_REMOVE_HEAD(content->new_delivery_annotations);
+            dx_free_buffer(buf);
+            buf = DEQ_HEAD(content->new_delivery_annotations);
+        }
+
         sys_mutex_free(content->lock);
         free_dx_message_content_t(content);
     }
@@ -397,8 +446,7 @@ dx_message_t *dx_message_copy(dx_message
         return 0;
 
     DEQ_ITEM_INIT(copy);
-    copy->content      = content;
-    copy->out_delivery = 0;
+    copy->content = content;
 
     sys_mutex_lock(content->lock);
     content->ref_count++;
@@ -408,55 +456,58 @@ dx_message_t *dx_message_copy(dx_message
 }
 
 
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg)
 {
-    ((dx_message_pvt_t*) msg)->out_delivery = delivery;
-}
+    dx_message_pvt_t     *msg     = (dx_message_pvt_t*) in_msg;
+    dx_message_content_t *content = msg->content;
 
+    if (content->parsed_delivery_annotations)
+        return content->parsed_delivery_annotations;
 
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
-{
-    return ((dx_message_pvt_t*) msg)->out_delivery;
-}
+    dx_field_iterator_t *da = dx_message_field_iterator(in_msg, DX_FIELD_DELIVERY_ANNOTATION);
+    if (da == 0)
+        return 0;
 
+    content->parsed_delivery_annotations = dx_parse(da);
+    if (content->parsed_delivery_annotations == 0 ||
+        !dx_parse_ok(content->parsed_delivery_annotations) ||
+        !dx_parse_is_map(content->parsed_delivery_annotations)) {
+        dx_field_iterator_free(da);
+        dx_parse_free(content->parsed_delivery_annotations);
+        return 0;
+    }
 
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
-{
-    dx_message_content_t *content = MSG_CONTENT(msg);
-    content->in_delivery = delivery;
+    return content->parsed_delivery_annotations;
 }
 
 
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
+void dx_message_set_delivery_annotations(dx_message_t *msg, dx_composed_field_t *da)
 {
-    dx_message_content_t *content = MSG_CONTENT(msg);
-    return content->in_delivery;
+    dx_message_content_t *content       = MSG_CONTENT(msg);
+    dx_buffer_list_t     *field_buffers = dx_compose_buffers(da);
+
+    assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
+    content->new_delivery_annotations = *field_buffers;
+    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
 }
 
 
-dx_message_t *dx_message_receive(pn_delivery_t *delivery)
+dx_message_t *dx_message_receive(dx_delivery_t *delivery)
 {
-    pn_link_t        *link = pn_delivery_link(delivery);
-    dx_message_pvt_t *msg  = (dx_message_pvt_t*) pn_delivery_get_context(delivery);
+    pn_delivery_t    *pnd  = dx_delivery_pn(delivery);
+    dx_message_pvt_t *msg  = (dx_message_pvt_t*) dx_delivery_context(delivery);
+    pn_link_t        *link = pn_delivery_link(pnd);
     ssize_t           rc;
     dx_buffer_t      *buf;
 
     //
     // If there is no message associated with the delivery, this is the first time
-    // we've received anything on this delivery.  Allocate a message descriptor and 
+    // we've received anything on this delivery.  Allocate a message descriptor and
     // link it and the delivery together.
     //
     if (!msg) {
         msg = (dx_message_pvt_t*) dx_allocate_message();
-        pn_delivery_set_context(delivery, (void*) msg);
-
-        //
-        // Record the incoming delivery only if it is not settled.  If it is 
-        // settled, it should not be recorded as no future operations on it are
-        // permitted.
-        //
-        if (!pn_delivery_settled(delivery))
-            msg->content->in_delivery = delivery;
+        dx_delivery_set_context(delivery, (void*) msg);
     }
 
     //
@@ -489,6 +540,7 @@ dx_message_t *dx_message_receive(pn_deli
                 DEQ_REMOVE_TAIL(msg->content->buffers);
                 dx_free_buffer(buf);
             }
+            dx_delivery_set_context(delivery, 0);
             return (dx_message_t*) msg;
         }
 
@@ -520,14 +572,76 @@ dx_message_t *dx_message_receive(pn_deli
 }
 
 
-void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+static void send_handler(void *context, const unsigned char *start, int length)
 {
-    dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
-    dx_buffer_t      *buf = DEQ_HEAD(msg->content->buffers);
+    pn_link_t *pnl = (pn_link_t*) context;
+    pn_link_send(pnl, (const char*) start, length);
+}
+
+
+void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
+{
+    dx_message_pvt_t     *msg     = (dx_message_pvt_t*) in_msg;
+    dx_message_content_t *content = msg->content;
+    dx_buffer_t          *buf     = DEQ_HEAD(content->buffers);
+    unsigned char        *cursor;
+    pn_link_t            *pnl     = dx_link_pn(link);
+
+    if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
+        //
+        // This is the case where the delivery annotations have been modified.
+        // The message send must be divided into sections:  The existing header;
+        // the new delivery annotations; the rest of the existing message.
+        // Note that the original delivery annotations that are still in the
+        // buffer chain must not be sent.
+        //
+        // Start by making sure that we've parsed the message sections through
+        // the delivery annotations
+        //
+        if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS))
+            return;
+
+        //
+        // Send header if present
+        //
+        cursor = dx_buffer_base(buf);
+        if (content->section_message_header.length > 0) {
+            pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3);
+            buf    = content->section_message_header.buffer;
+            cursor = content->section_message_header.offset + dx_buffer_base(buf);
+            advance(&cursor, &buf, content->section_message_header.length, send_handler, (void*) pnl);
+        }
+
+        //
+        // Send new delivery annotations
+        //
+        dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
+        while (da_buf) {
+            pn_link_send(pnl, (char*) dx_buffer_base(da_buf), dx_buffer_size(da_buf));
+            da_buf = DEQ_NEXT(da_buf);
+        }
+
+        //
+        // Skip over replaced delivery annotations
+        //
+        if (content->section_delivery_annotation.length > 0)
+            advance(&cursor, &buf,
+                    content->section_delivery_annotation.hdr_length + content->section_delivery_annotation.length,
+                    0, 0);
+
+        //
+        // Send remaining partial buffer
+        //
+        if (buf) {
+            size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf));
+            advance(&cursor, &buf, len, send_handler, (void*) pnl);
+        }
+
+        // Fall through to process the remaining buffers normally
+    }
 
-    // TODO - Handle cases where annotations have been added or modified
     while (buf) {
-        pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+        pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
         buf = DEQ_NEXT(buf);
     }
 }
@@ -557,29 +671,6 @@ static int dx_check_field_LH(dx_message_
 
 static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t depth)
 {
-    static const unsigned char * const MSG_HDR_LONG                 = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
-    static const unsigned char * const MSG_HDR_SHORT                = (unsigned char*) "\x00\x53\x70";
-    static const unsigned char * const DELIVERY_ANNOTATION_LONG     = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
-    static const unsigned char * const DELIVERY_ANNOTATION_SHORT    = (unsigned char*) "\x00\x53\x71";
-    static const unsigned char * const MESSAGE_ANNOTATION_LONG      = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
-    static const unsigned char * const MESSAGE_ANNOTATION_SHORT     = (unsigned char*) "\x00\x53\x72";
-    static const unsigned char * const PROPERTIES_LONG              = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
-    static const unsigned char * const PROPERTIES_SHORT             = (unsigned char*) "\x00\x53\x73";
-    static const unsigned char * const APPLICATION_PROPERTIES_LONG  = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
-    static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned char*) "\x00\x53\x74";
-    static const unsigned char * const BODY_DATA_LONG               = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
-    static const unsigned char * const BODY_DATA_SHORT              = (unsigned char*) "\x00\x53\x75";
-    static const unsigned char * const BODY_SEQUENCE_LONG           = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
-    static const unsigned char * const BODY_SEQUENCE_SHORT          = (unsigned char*) "\x00\x53\x76";
-    static const unsigned char * const BODY_VALUE_LONG              = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
-    static const unsigned char * const BODY_VALUE_SHORT             = (unsigned char*) "\x00\x53\x77";
-    static const unsigned char * const FOOTER_LONG                  = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
-    static const unsigned char * const FOOTER_SHORT                 = (unsigned char*) "\x00\x53\x78";
-    static const unsigned char * const TAGS_LIST                    = (unsigned char*) "\x45\xc0\xd0";
-    static const unsigned char * const TAGS_MAP                     = (unsigned char*) "\xc1\xd1";
-    static const unsigned char * const TAGS_BINARY                  = (unsigned char*) "\xa0\xb0";
-    static const unsigned char * const TAGS_ANY                     = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
-
     dx_buffer_t *buffer  = DEQ_HEAD(content->buffers);
 
     if (!buffer)

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,10 +45,11 @@
  */
 
 typedef struct {
-    dx_buffer_t *buffer;  // Buffer that contains the first octet of the field, null if the field is not present
-    size_t       offset;  // Offset in the buffer to the first octet
-    size_t       length;  // Length of the field or zero if unneeded
-    int          parsed;  // non-zero iff the buffer chain has been parsed to find this field
+    dx_buffer_t *buffer;     // Buffer that contains the first octet of the field, null if the field is not present
+    size_t       offset;     // Offset in the buffer to the first octet
+    size_t       length;     // Length of the field or zero if unneeded
+    size_t       hdr_length; // Length of the field's header (not included in the length of the field)
+    int          parsed;     // non-zero iff the buffer chain has been parsed to find this field
 } dx_field_location_t;
 
 
@@ -58,12 +59,15 @@ typedef struct {
 //            1) Received message is held and forwarded unmodified - single buffer list
 //            2) Received message is held and modified before forwarding - two buffer lists
 //            3) Message is composed internally - single buffer list
+// TODO - provide a way to allocate a message without a lock for the link-routing case.
+//        It's likely that link-routing will cause no contention for the message content.
+//
 
 typedef struct {
     sys_mutex_t         *lock;
-    uint32_t             ref_count;                       // The number of qmessages referencing this
+    uint32_t             ref_count;                       // The number of messages referencing this
     dx_buffer_list_t     buffers;                         // The buffer chain containing the message
-    pn_delivery_t       *in_delivery;                     // The delivery on which the message arrived
+    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain containing the new delivery annotations
     dx_field_location_t  section_message_header;          // The message header list
     dx_field_location_t  section_delivery_annotation;     // The delivery annotation map
     dx_field_location_t  section_message_annotation;      // The message annotation map
@@ -78,12 +82,12 @@ typedef struct {
     dx_buffer_t         *parse_buffer;
     unsigned char       *parse_cursor;
     dx_message_depth_t   parse_depth;
+    dx_parsed_field_t   *parsed_delivery_annotations;
 } dx_message_content_t;
 
 typedef struct {
-    DEQ_LINKS(dx_message_t);                              // Deq linkage that overlays the dx_message_t
+    DEQ_LINKS(dx_message_t);   // Deque linkage that overlays the dx_message_t
     dx_message_content_t *content;
-    pn_delivery_t        *out_delivery;
 } dx_message_pvt_t;
 
 ALLOC_DECLARE(dx_message_t);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/parse.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t);
 ALLOC_DEFINE(dx_parsed_field_t);
 
 
-static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
 {
     if (dx_field_iterator_end(iter))
         return "Insufficient Data to Determine Tag";
-    *tag    = dx_field_iterator_octet(iter);
-    *count  = 0;
-    *length = 0;
+    *tag      = dx_field_iterator_octet(iter);
+    *count    = 0;
+    *length   = 0;
+    *clen     = 0;
 
     switch (*tag & 0xF0) {
     case 0x40: *length = 0;  break;
@@ -59,7 +60,7 @@ static char *get_type_info(dx_field_iter
         *length += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
         *length += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
         // fall through to the next case
-        
+
     case 0xA0:
     case 0xC0:
     case 0xE0:
@@ -78,19 +79,24 @@ static char *get_type_info(dx_field_iter
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+        *clen = 3;
         // fall through to the next case
-        
+
     case 0xC0:
     case 0xE0:
         if (dx_field_iterator_end(iter))
             return "Insufficient Data to Determine Count";
         *count += (unsigned int) dx_field_iterator_octet(iter);
+        *clen += 1;
         break;
     }
 
     if ((*tag == DX_AMQP_MAP8 || *tag == DX_AMQP_MAP32) && (*count & 1))
         return "Odd Number of Elements in a Map";
 
+    if (*clen > *length)
+        return "Insufficient Length to Determine Count";
+
     return 0;
 }
 
@@ -108,13 +114,13 @@ static dx_parsed_field_t *dx_parse_inter
 
     uint32_t length;
     uint32_t count;
+    uint32_t length_of_count;
 
-    field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+    field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
 
     if (!field->parse_error) {
         field->raw_iter = dx_field_iterator_sub(iter, length);
-        if (count == 0 && length > 0)
-            dx_field_iterator_advance(iter, length);
+        dx_field_iterator_advance(iter, length - length_of_count);
         for (uint32_t idx = 0; idx < count; idx++) {
             dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
             DEQ_INSERT_TAIL(field->children, child);
@@ -377,4 +383,3 @@ dx_parsed_field_t *dx_parse_value_by_key
 
     return 0;
 }
-

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/posix/threading.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -103,7 +103,7 @@ void sys_cond_signal_all(sys_cond_t *con
 
 struct sys_thread_t {
     pthread_t thread;
-}; 
+};
 
 sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg)
 {
@@ -123,4 +123,3 @@ void sys_thread_join(sys_thread_t *threa
 {
     pthread_join(thread->thread, 0);
 }
-

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/python_embedded.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
 typedef struct {
     PyObject_HEAD
     PyObject       *handler;
+    PyObject       *handler_rx_call;
     dx_dispatch_t  *dx;
     dx_address_t   *address;
 } IoAdapter;
@@ -403,9 +404,65 @@ typedef struct {
 
 static void dx_io_rx_handler(void *context, dx_message_t *msg)
 {
-    //IoAdapter *self = (IoAdapter*) context;
+    IoAdapter *self = (IoAdapter*) context;
 
-    // TODO - Parse the incoming message and send it to the python handler.
+    //
+    // Parse the message through the body and exit if the message is not well formed.
+    //
+    if (!dx_message_check(msg, DX_DEPTH_BODY))
+        return;
+
+    //
+    // Get an iterator for the application-properties.  Exit if the message has none.
+    //
+    dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+    if (ap == 0)
+        return;
+
+    //
+    // Try to get a map-view of the application-properties.
+    //
+    dx_parsed_field_t *ap_map = dx_parse(ap);
+    if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+        dx_field_iterator_free(ap);
+        dx_parse_free(ap_map);
+        return;
+    }
+
+    //
+    // Get an iterator for the body.  Exit if the message has none.
+    //
+    dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+    if (body == 0) {
+        dx_field_iterator_free(ap);
+        dx_parse_free(ap_map);
+        return;
+    }
+
+    //
+    // Try to get a map-view of the body.
+    //
+    dx_parsed_field_t *body_map = dx_parse(body);
+    if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+        dx_field_iterator_free(ap);
+        dx_field_iterator_free(body);
+        dx_parse_free(ap_map);
+        dx_parse_free(body_map);
+        return;
+    }
+
+    PyObject *pAP   = dx_field_to_py(ap_map);
+    PyObject *pBody = dx_field_to_py(body_map);
+
+    PyObject *pArgs = PyTuple_New(2);
+    PyTuple_SetItem(pArgs, 0, pAP);
+    PyTuple_SetItem(pArgs, 1, pBody);
+
+    PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+    Py_DECREF(pArgs);
+    if (pValue) {
+        Py_DECREF(pValue);
+    }
 }
 
 
@@ -415,9 +472,14 @@ static int IoAdapter_init(IoAdapter *sel
     if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
         return -1;
 
+    self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+    if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+        return -1;
+
     Py_INCREF(self->handler);
+    Py_INCREF(self->handler_rx_call);
     self->dx = dispatch;
-    self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+    self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
     return 0;
 }
 
@@ -426,24 +488,35 @@ static void IoAdapter_dealloc(IoAdapter*
 {
     dx_router_unregister_address(self->address);
     Py_DECREF(self->handler);
+    Py_DECREF(self->handler_rx_call);
     self->ob_type->tp_free((PyObject*)self);
 }
 
 
 static PyObject* dx_python_send(PyObject *self, PyObject *args)
 {
-    IoAdapter  *ioa = (IoAdapter*) self;
-    const char *address;
-    PyObject   *app_properties;
-    PyObject   *body;
+    IoAdapter           *ioa   = (IoAdapter*) self;
+    dx_composed_field_t *field = 0;
+    const char          *address;
+    PyObject            *app_properties;
+    PyObject            *body;
+
     if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
         return 0;
 
-    dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+    field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+    dx_compose_start_map(field);
+
+    dx_compose_insert_string(field, "qdx.ingress");
+    dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+    dx_compose_insert_string(field, "qdx.trace");
     dx_compose_start_list(field);
-    dx_compose_insert_bool(field, 0);     // durable
+    dx_compose_insert_string(field, dx_router_id(ioa->dx));
     dx_compose_end_list(field);
 
+    dx_compose_end_map(field);
+
     field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
     dx_compose_start_list(field);
     dx_compose_insert_null(field);            // message-id

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/router_node.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
 #include <qpid/dispatch/python_embedded.h>
 #include <stdio.h>
 #include <string.h>
+#include <stdbool.h>
 #include <qpid/dispatch.h>
 #include "dispatch_private.h"
 
@@ -28,123 +29,290 @@ static char *module = "ROUTER";
 static void dx_router_python_setup(dx_router_t *router);
 static void dx_pyrouter_tick(dx_router_t *router);
 
-//static char *local_prefix = "_local/";
-//static char *topo_prefix  = "_topo/";
+static char *router_address = "_local/qdxrouter";
+static char *local_prefix   = "_local/";
+//static char *topo_prefix    = "_topo/";
 
 /**
  * Address Types and Processing:
  *
- *   Address                              Hash Key       onReceive         onEmit
- *   =============================================================================
- *   _local/<local>                       L<local>       handler           forward
- *   _topo/<area>/<router>/<local>        A<area>        forward           forward
- *   _topo/<my-area>/<router>/<local>     R<router>      forward           forward
- *   _topo/<my-area>/<my-router>/<local>  L<local>       forward+handler   forward
- *   _topo/<area>/all/<local>             A<area>        forward           forward
- *   _topo/<my-area>/all/<local>          L<local>       forward+handler   forward
- *   _topo/all/all/<local>                L<local>       forward+handler   forward
- *   <mobile>                             M<mobile>      forward+handler   forward
+ *   Address                              Hash Key       onReceive
+ *   ===================================================================
+ *   _local/<local>                       L<local>               handler
+ *   _topo/<area>/<router>/<local>        A<area>        forward
+ *   _topo/<my-area>/<router>/<local>     R<router>      forward
+ *   _topo/<my-area>/<my-router>/<local>  L<local>               handler
+ *   _topo/<area>/all/<local>             A<area>        forward
+ *   _topo/<my-area>/all/<local>          L<local>       forward handler
+ *   _topo/all/all/<local>                L<local>       forward handler
+ *   <mobile>                             M<mobile>      forward handler
  */
 
-struct dx_router_t {
-    dx_dispatch_t      *dx;
-    const char         *router_area;
-    const char         *router_id;
-    dx_node_t          *node;
-    dx_link_list_t      in_links;
-    dx_link_list_t      out_links;
-    dx_message_list_t   in_fifo;
-    sys_mutex_t        *lock;
-    dx_timer_t         *timer;
-    hash_t             *out_hash;
-    uint64_t            dtag;
-    PyObject           *pyRouter;
-    PyObject           *pyTick;
-};
 
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
 
-typedef struct {
-    dx_link_t         *link;
-    dx_message_list_t  out_fifo;
-} dx_router_link_t;
+
+typedef enum {
+    DX_LINK_ENDPOINT,   // A link to a connected endpoint
+    DX_LINK_ROUTER,     // A link to a peer router in the same area
+    DX_LINK_AREA        // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+    DEQ_LINKS(struct dx_routed_event_t);
+    dx_delivery_t *delivery;
+    dx_message_t  *message;
+    bool           settle;
+    uint64_t       disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+ALLOC_DEFINE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+    DEQ_LINKS(dx_router_link_t);
+    dx_direction_t          link_direction;
+    dx_link_type_t          link_type;
+    dx_address_t           *owning_addr;     // [ref] Address record that owns this link
+    dx_link_t              *link;            // [own] Link pointer
+    dx_router_link_t       *connected_link;  // [ref] If this is a link-route, reference the connected link
+    dx_router_link_t       *peer_link;       // [ref] If this is a bidirectional link-route, reference the peer link
+    dx_routed_event_list_t  event_fifo;      // FIFO of outgoing delivery/link events (no messages)
+    dx_routed_event_list_t  msg_fifo;        // FIFO of outgoing message deliveries
+};
 
 ALLOC_DECLARE(dx_router_link_t);
 ALLOC_DEFINE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
 
-
-typedef struct {
+struct dx_router_node_t {
+    DEQ_LINKS(dx_router_node_t);
     const char       *id;
-    dx_router_link_t *next_hop;
+    dx_router_node_t *next_hop;   // Next hop node _if_ this is not a neighbor node
+    dx_router_link_t *peer_link;  // Outgoing link _if_ this is a neighbor node
     // list of valid origins (pointers to router_node) - (bit masks?)
-} dx_router_node_t;
+};
 
 ALLOC_DECLARE(dx_router_node_t);
 ALLOC_DEFINE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
 
 
 struct dx_address_t {
-    int                   is_local;
-    dx_router_message_cb  handler;           // In-Process Consumer
-    void                 *handler_context;
-    dx_router_link_t     *rlink;             // Locally-Connected Consumer  - TODO: Make this a list
-    dx_router_node_t     *rnode;             // Remotely-Connected Consumer - TODO: Make this a list
+    dx_router_message_cb   handler;          // In-Process Consumer
+    void                  *handler_context;
+    dx_router_link_list_t  rlinks;           // Locally-Connected Consumers
+    dx_router_node_list_t  rnodes;           // Remotely-Connected Consumers
 };
 
 ALLOC_DECLARE(dx_address_t);
 ALLOC_DEFINE(dx_address_t);
 
 
+struct dx_router_t {
+    dx_dispatch_t         *dx;
+    const char            *router_area;
+    const char            *router_id;
+    dx_node_t             *node;
+    dx_router_link_list_t  in_links;
+    dx_router_node_list_t  routers;
+    dx_message_list_t      in_fifo;
+    sys_mutex_t           *lock;
+    dx_timer_t            *timer;
+    hash_t                *out_hash;
+    uint64_t               dtag;
+    PyObject              *pyRouter;
+    PyObject              *pyTick;
+};
+
+
 /**
- * Outbound Delivery Handler
+ * Outgoing Link Writable Handler
  */
-static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static int router_writable_link_handler(void* context, dx_link_t *link)
 {
-    dx_router_t      *router  = (dx_router_t*) context;
-    pn_link_t        *pn_link = pn_delivery_link(delivery);
-    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
-    dx_message_t     *msg;
-    size_t            size;
+    dx_router_t            *router = (dx_router_t*) context;
+    dx_delivery_t          *delivery;
+    dx_router_link_t       *rlink = (dx_router_link_t*) dx_link_get_context(link);
+    pn_link_t              *pn_link = dx_link_pn(link);
+    uint64_t                tag;
+    int                     link_credit = pn_link_credit(pn_link);
+    dx_routed_event_list_t  to_send;
+    dx_routed_event_list_t  events;
+    dx_routed_event_t      *re;
+    size_t                  offer;
+    int                     event_count = 0;
+
+    DEQ_INIT(to_send);
+    DEQ_INIT(events);
 
     sys_mutex_lock(router->lock);
-    msg = DEQ_HEAD(rlink->out_fifo);
-    if (!msg) {
-        // TODO - Recind the delivery
-        sys_mutex_unlock(router->lock);
-        return;
+
+    //
+    // Pull the non-delivery events into a local list so they can be processed without
+    // the lock being held.
+    //
+    re = DEQ_HEAD(rlink->event_fifo);
+    while (re) {
+        DEQ_REMOVE_HEAD(rlink->event_fifo);
+        DEQ_INSERT_TAIL(events, re);
+        re = DEQ_HEAD(rlink->event_fifo);
+    }
+
+    //
+    // Under lock, move available deliveries from the msg_fifo to the local to_send
+    // list.  Don't move more than we have credit to send.
+    //
+    if (link_credit > 0) {
+        tag = router->dtag;
+        re = DEQ_HEAD(rlink->msg_fifo);
+        while (re) {
+            DEQ_REMOVE_HEAD(rlink->msg_fifo);
+            DEQ_INSERT_TAIL(to_send, re);
+            if (DEQ_SIZE(to_send) == link_credit)
+                break;
+            re = DEQ_HEAD(rlink->msg_fifo);
+        }
+        router->dtag += DEQ_SIZE(to_send);
     }
 
-    DEQ_REMOVE_HEAD(rlink->out_fifo);
-    size = (DEQ_SIZE(rlink->out_fifo));
+    offer = DEQ_SIZE(rlink->msg_fifo);
     sys_mutex_unlock(router->lock);
 
-    dx_message_send(msg, pn_link);
+    //
+    // Deliver all the to_send messages downrange
+    //
+    re = DEQ_HEAD(to_send);
+    while (re) {
+        DEQ_REMOVE_HEAD(to_send);
+
+        //
+        // Get a delivery for the send.  This will be the current deliver on the link.
+        //
+        tag++;
+        delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
+
+        //
+        // Send the message
+        //
+        dx_message_send(re->message, link);
+
+        //
+        // If there is an incoming delivery associated with this message, link it
+        // with the outgoing delivery.  Otherwise, the message arrived pre-settled
+        // and should be sent presettled.
+        //
+        if (re->delivery) {
+            dx_delivery_set_peer(re->delivery, delivery);
+            dx_delivery_set_peer(delivery, re->delivery);
+        } else
+            dx_delivery_free(delivery, 0);  // settle and free
+
+        pn_link_advance(pn_link);
+        event_count++;
+
+        dx_free_message(re->message);
+        free_dx_routed_event_t(re);
+        re = DEQ_HEAD(to_send);
+    }
 
     //
-    // If there is no incoming delivery, it was pre-settled.  In this case,
-    // we must pre-settle the outgoing delivery as well.
+    // Process the non-delivery events.
     //
-    if (dx_message_in_delivery(msg)) {
-        pn_delivery_set_context(delivery, (void*) msg);
-        dx_message_set_out_delivery(msg, delivery);
-    } else {
-        pn_delivery_settle(delivery);
-        dx_free_message(msg);
+    re = DEQ_HEAD(events);
+    while (re) {
+        DEQ_REMOVE_HEAD(events);
+
+        if (re->delivery) {
+            if (re->disposition) {
+                pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition);
+                event_count++;
+            }
+            if (re->settle) {
+                dx_delivery_free(re->delivery, 0);
+                event_count++;
+            }
+        }
+
+        free_dx_routed_event_t(re);
+        re = DEQ_HEAD(events);
     }
 
-    pn_link_advance(pn_link);
-    pn_link_offered(pn_link, size);
+    //
+    // Set the offer to the number of messages remaining to be sent.
+    //
+    pn_link_offered(pn_link, offer);
+    return event_count;
+}
+
+
+static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+{
+    dx_parsed_field_t   *in_da  = dx_message_delivery_annotations(msg);
+    dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+
+    dx_parsed_field_t *trace   = 0;
+    dx_parsed_field_t *ingress = 0;
+
+    if (in_da) {
+        trace   = dx_parse_value_by_key(in_da, "qdx.trace");
+        ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+    }
+
+    dx_compose_start_map(out_da);
+
+    //
+    // If there is a trace field, append this router's ID to the trace.
+    //
+    if (trace && dx_parse_is_list(trace)) {
+        dx_compose_insert_string(out_da, "qdx.trace");
+        dx_compose_start_list(out_da);
+
+        uint32_t idx = 0;
+        dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+        while (trace_item) {
+            dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+            dx_compose_insert_string_iterator(out_da, iter);
+            idx++;
+            trace_item = dx_parse_sub_value(trace, idx);
+        }
+
+        dx_compose_insert_string(out_da, router->router_id);
+        dx_compose_end_list(out_da);
+    }
+
+    //
+    // If there is no ingress field, annotate the ingress as this router else
+    // keep the original field.
+    //
+    dx_compose_insert_string(out_da, "qdx.ingress");
+    if (ingress && dx_parse_is_scalar(ingress)) {
+        dx_field_iterator_t *iter = dx_parse_raw(ingress);
+        dx_compose_insert_string_iterator(out_da, iter);
+    } else
+        dx_compose_insert_string(out_da, router->router_id);
+
+    dx_compose_end_map(out_da);
+
+    dx_message_set_delivery_annotations(msg, out_da);
+    dx_compose_free(out_da);
 }
 
 
 /**
  * Inbound Delivery Handler
  */
-static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
 {
-    dx_router_t  *router  = (dx_router_t*) context;
-    pn_link_t    *pn_link = pn_delivery_link(delivery);
-    dx_message_t *msg;
-    int           valid_message = 0;
+    dx_router_t      *router  = (dx_router_t*) context;
+    pn_link_t        *pn_link = dx_link_pn(link);
+    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
+    dx_message_t     *msg;
+    int               valid_message = 0;
 
     //
     // Receive the message into a local representation.  If the returned message
@@ -158,20 +326,63 @@ static void router_rx_handler(void* cont
         return;
 
     //
-    // Validate the message through the Properties section
+    // Consume the delivery and issue a replacement credit
     //
-    valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
-
     pn_link_advance(pn_link);
     pn_link_flow(pn_link, 1);
 
+    sys_mutex_lock(router->lock);
+
+    //
+    // Handle the Link-Routing case.  If this incoming link is associated with a connected
+    // link, simply deliver the message to the outgoing link.  There is no need to validate
+    // the message in this case.
+    //
+    if (rlink->connected_link) {
+        dx_router_link_t  *clink = rlink->connected_link;
+        dx_routed_event_t *re    = new_dx_routed_event_t();
+
+        DEQ_ITEM_INIT(re);
+        re->delivery    = 0;
+        re->message     = msg;
+        re->settle      = false;
+        re->disposition = 0;
+        DEQ_INSERT_TAIL(clink->msg_fifo, re);
+
+        //
+        // If the incoming delivery is settled (pre-settled), don't link it into the routed
+        // event.  If it's not settled, link it into the event for later handling.
+        //
+        if (dx_delivery_settled(delivery))
+            dx_delivery_free(delivery, 0);
+        else
+            re->delivery = delivery;
+
+        sys_mutex_unlock(router->lock);
+        dx_link_activate(clink->link);
+        return;
+    }
+
+    //
+    // We are performing Message-Routing, therefore we will need to validate the message
+    // through the Properties section so we can access the TO field.
+    //
+    dx_message_t         *in_process_copy = 0;
+    dx_router_message_cb  handler         = 0;
+    void                 *handler_context = 0;
+
+    valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
     if (valid_message) {
         dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
         dx_address_t        *addr;
+        int                  fanout = 0;
+
         if (iter) {
             dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-            sys_mutex_lock(router->lock);
             hash_retrieve(router->out_hash, iter, (void*) &addr);
+            dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+            int is_local = dx_field_iterator_prefix(iter, local_prefix);
             dx_field_iterator_free(iter);
 
             if (addr) {
@@ -179,108 +390,145 @@ static void router_rx_handler(void* cont
                 // To field is valid and contains a known destination.  Handle the various
                 // cases for forwarding.
                 //
-                // Forward to the in-process handler for this message if there is one.
-                // Note: If the handler is going to queue the message for deferred processing,
-                //       it must copy the message.  This function assumes that the handler
-                //       will process the message synchronously and be finished with it upon
-                //       completion.
-                //
-                if (addr->handler)
-                    addr->handler(addr->handler_context, msg);
 
                 //
-                // Forward to the local link for the locally-connected consumer, if present.
-                // TODO - Don't forward if this is a "_local" address.
+                // Interpret and update the delivery annotations of the message
                 //
-                if (addr->rlink) {
-                    pn_link_t    *pn_outlink = dx_link_pn(addr->rlink->link);
-                    dx_message_t *copy       = dx_message_copy(msg);
-                    DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
-                    pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
-                    dx_link_activate(addr->rlink->link);
-                }
+                router_annotate_message(router, msg);
 
                 //
-                // Forward to the next-hop for a remotely-connected consumer, if present.
-                // Don't forward if this is a "_local" address.
+                // Forward to the in-process handler for this message if there is one.  The
+                // actual invocation of the handler will occur later after we've released
+                // the lock.
                 //
-                if (addr->rnode) {
-                    // TODO
+                if (addr->handler) {
+                    in_process_copy = dx_message_copy(msg);
+                    handler         = addr->handler;
+                    handler_context = addr->handler_context;
                 }
 
-            } else {
                 //
-                // To field contains an unknown address.  Release the message.
+                // If the address form is local (i.e. is prefixed by _local), don't forward
+                // outside of the router process.
                 //
-                pn_delivery_update(delivery, PN_RELEASED);
-                pn_delivery_settle(delivery);
+                if (!is_local) {
+                    //
+                    // Forward to all of the local links receiving this address.
+                    //
+                    dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+                    while (dest_link) {
+                        dx_routed_event_t *re = new_dx_routed_event_t();
+                        DEQ_ITEM_INIT(re);
+                        re->delivery    = 0;
+                        re->message     = dx_message_copy(msg);
+                        re->settle      = 0;
+                        re->disposition = 0;
+                        DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+                        fanout++;
+                        if (fanout == 1 && !dx_delivery_settled(delivery))
+                            re->delivery = delivery;
+
+                        dx_link_activate(dest_link->link);
+                        dest_link = DEQ_NEXT(dest_link);
+                    }
+
+                    //
+                    // Forward to the next-hops for remote destinations.
+                    //
+                    dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+                    while (dest_node) {
+                        if (dest_node->next_hop)
+                            dest_link = dest_node->next_hop->peer_link;
+                        else
+                            dest_link = dest_node->peer_link;
+                        if (dest_link) {
+                            dx_routed_event_t *re = new_dx_routed_event_t();
+                            DEQ_ITEM_INIT(re);
+                            re->delivery    = 0;
+                            re->message     = dx_message_copy(msg);
+                            re->settle      = 0;
+                            re->disposition = 0;
+                            DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+                            fanout++;
+                            if (fanout == 1)
+                                re->delivery = delivery;
+
+                            dx_link_activate(dest_link->link);
+                        }
+                        dest_node = DEQ_NEXT(dest_node);
+                    }
+                }
             }
 
-            sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
-            dx_free_message(msg);
-
             //
-            // If this was a pre-settled delivery, we must also locally settle it.
+            // In message-routing mode, the handling of the incoming delivery depends on the
+            // number of copies of the received message that were forwarded.
             //
-            if (pn_delivery_settled(delivery))
-                pn_delivery_settle(delivery);
+            if (handler) {
+                dx_delivery_free(delivery, PN_ACCEPTED);
+            } else if (fanout == 0) {
+                dx_delivery_free(delivery, PN_RELEASED);
+            } else if (fanout > 1)
+                dx_delivery_free(delivery, PN_ACCEPTED);
         }
     } else {
         //
         // Message is invalid.  Reject the message.
         //
-        pn_delivery_update(delivery, PN_REJECTED);
-        pn_delivery_settle(delivery);
-        pn_delivery_set_context(delivery, 0);
-        dx_free_message(msg);
+        dx_delivery_free(delivery, PN_REJECTED);
     }
+
+    sys_mutex_unlock(router->lock);
+    dx_free_message(msg);
+
+    //
+    // Invoke the in-process handler now that the lock is released.
+    //
+    if (handler)
+        handler(handler_context, in_process_copy);
 }
 
 
 /**
  * Delivery Disposition Handler
  */
-static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
+static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t *delivery)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-
-    if (pn_link_is_sender(pn_link)) {
-        uint64_t       disp     = pn_delivery_remote_state(delivery);
-        dx_message_t  *msg      = pn_delivery_get_context(delivery);
-        pn_delivery_t *activate = 0;
-
-        if (msg) {
-            assert(delivery == dx_message_out_delivery(msg));
-            if (disp != 0) {
-                activate = dx_message_in_delivery(msg);
-                pn_delivery_update(activate, disp);
-                // TODO - handling of the data accompanying RECEIVED/MODIFIED
-            }
+    dx_router_t   *router  = (dx_router_t*) context;
+    bool           changed = dx_delivery_disp_changed(delivery);
+    uint64_t       disp    = dx_delivery_disp(delivery);
+    bool           settled = dx_delivery_settled(delivery);
+    dx_delivery_t *peer    = dx_delivery_peer(delivery);
 
-            if (pn_delivery_settled(delivery)) {
-                //
-                // Downstream delivery has been settled.  Propagate the settlement
-                // upstream.
-                //
-                activate = dx_message_in_delivery(msg);
-                pn_delivery_settle(activate);
-                pn_delivery_settle(delivery);
-                dx_free_message(msg);
-            }
+    if (peer) {
+        //
+        // The case where this delivery has a peer.
+        //
+        if (changed || settled) {
+            dx_link_t         *peer_link = dx_delivery_link(peer);
+            dx_router_link_t  *prl       = (dx_router_link_t*) dx_link_get_context(peer_link);
+            dx_routed_event_t *re        = new_dx_routed_event_t();
+            DEQ_ITEM_INIT(re);
+            re->delivery    = peer;
+            re->message     = 0;
+            re->settle      = settled;
+            re->disposition = changed ? disp : 0;
 
-            if (activate) {
-                //
-                // Activate the upstream/incoming link so that the settlement will
-                // get pushed out.
-                //
-                dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate));
-                dx_link_activate(act_link);
-            }
+            sys_mutex_lock(router->lock);
+            DEQ_INSERT_TAIL(prl->event_fifo, re);
+            sys_mutex_unlock(router->lock);
 
-            return;
+            dx_link_activate(peer_link);
         }
+
     } else {
-        // TODO - Handle disposition updates from upstream
+        //
+        // The no-peer case.  Ignore status changes and echo settlement.
+        //
+        if (settled)
+            dx_delivery_free(delivery, 0);
     }
 }
 
@@ -290,25 +538,36 @@ static void router_disp_handler(void* co
  */
 static int router_incoming_link_handler(void* context, dx_link_t *link)
 {
-    dx_router_t    *router  = (dx_router_t*) context;
-    dx_link_item_t *item    = new_dx_link_item_t();
-    pn_link_t      *pn_link = dx_link_pn(link);
-
-    if (item) {
-        DEQ_ITEM_INIT(item);
-        item->link = link;
+    dx_router_t      *router  = (dx_router_t*) context;
+    dx_router_link_t *rlink   = new_dx_router_link_t();
+    pn_link_t        *pn_link = dx_link_pn(link);
 
-        sys_mutex_lock(router->lock);
-        DEQ_INSERT_TAIL(router->in_links, item);
-        sys_mutex_unlock(router->lock);
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_INCOMING;
+    rlink->link_type      = DX_LINK_ENDPOINT;
+    rlink->owning_addr    = 0;
+    rlink->link           = link;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
+
+    dx_link_set_context(link, rlink);
+
+    sys_mutex_lock(router->lock);
+    DEQ_INSERT_TAIL(router->in_links, rlink);
+    sys_mutex_unlock(router->lock);
+
+    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+    pn_link_flow(pn_link, 1000);
+    pn_link_open(pn_link);
+
+    //
+    // TODO - If the address has link-route semantics, create all associated
+    //        links needed to go with this one.
+    //
 
-        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
-        pn_link_flow(pn_link, 1000);
-        pn_link_open(pn_link);
-    } else {
-        pn_link_close(pn_link);
-    }
     return 0;
 }
 
@@ -327,73 +586,45 @@ static int router_outgoing_link_handler(
         return 0;
     }
 
-    dx_router_link_t *rlink = new_dx_router_link_t();
-    rlink->link = link;
-    DEQ_INIT(rlink->out_fifo);
+    dx_field_iterator_t *iter  = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+    dx_router_link_t    *rlink = new_dx_router_link_t();
+
+    int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_OUTGOING;
+    rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+    rlink->link           = link;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
+
     dx_link_set_context(link, rlink);
 
+    dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
     dx_address_t *addr;
 
-    dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-
     sys_mutex_lock(router->lock);
     hash_retrieve(router->out_hash, iter, (void**) &addr);
     if (!addr) {
         addr = new_dx_address_t();
-        addr->is_local        = 0;
         addr->handler         = 0;
         addr->handler_context = 0;
-        addr->rlink           = 0;
-        addr->rnode           = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
         hash_insert(router->out_hash, iter, addr);
     }
     dx_field_iterator_free(iter);
 
-    if (addr->rlink == 0) {
-        addr->rlink = rlink;
-        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
-        pn_link_open(pn_link);
-        sys_mutex_unlock(router->lock);
-        dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
-        return 0;
-    }
-
-    dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
-    pn_link_close(pn_link);
-    sys_mutex_unlock(router->lock);
-    return 0;
-}
-
-
-/**
- * Outgoing Link Writable Handler
- */
-static int router_writable_link_handler(void* context, dx_link_t *link)
-{
-    dx_router_t      *router = (dx_router_t*) context;
-    int               grant_delivery = 0;
-    pn_delivery_t    *delivery;
-    dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
-    pn_link_t        *pn_link = dx_link_pn(link);
-    uint64_t          tag;
+    rlink->owning_addr = addr;
+    DEQ_INSERT_TAIL(addr->rlinks, rlink);
 
-    sys_mutex_lock(router->lock);
-    if (DEQ_SIZE(rlink->out_fifo) > 0) {
-        grant_delivery = 1;
-        tag = router->dtag++;
-    }
+    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+    pn_link_open(pn_link);
     sys_mutex_unlock(router->lock);
-
-    if (grant_delivery) {
-        pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
-        delivery = pn_link_current(pn_link);
-        if (delivery) {
-            router_tx_handler(context, link, delivery);
-            return 1;
-        }
-    }
-
+    dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
     return 0;
 }
 
@@ -403,44 +634,37 @@ static int router_writable_link_handler(
  */
 static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
 {
-    dx_router_t    *router  = (dx_router_t*) context;
-    pn_link_t      *pn_link = dx_link_pn(link);
-    const char     *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
-    dx_link_item_t *item;
+    dx_router_t      *router  = (dx_router_t*) context;
+    pn_link_t        *pn_link = dx_link_pn(link);
+    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
+    const char       *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
 
-    if (!r_tgt)
+    if (!rlink)
         return 0;
 
     sys_mutex_lock(router->lock);
     if (pn_link_is_sender(pn_link)) {
-        item = DEQ_HEAD(router->out_links);
+        DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
 
-        dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-        dx_address_t        *addr;
-        if (iter) {
-            hash_retrieve(router->out_hash, iter, (void**) &addr);
-            if (addr) {
-                hash_remove(router->out_hash, iter);
-                free_dx_router_link_t(addr->rlink);
-                free_dx_address_t(addr);
-                dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+        if ((rlink->owning_addr->handler == 0) &&
+            (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
+            (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
+            dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+            dx_address_t        *addr;
+            if (iter) {
+                hash_retrieve(router->out_hash, iter, (void**) &addr);
+                if (addr == rlink->owning_addr) {
+                    hash_remove(router->out_hash, iter);
+                    free_dx_router_link_t(rlink);
+                    free_dx_address_t(addr);
+                    dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+                }
+                dx_field_iterator_free(iter);
             }
-            dx_field_iterator_free(iter);
-        }
-    }
-    else
-        item = DEQ_HEAD(router->in_links);
-
-    while (item) {
-        if (item->link == link) {
-            if (pn_link_is_sender(pn_link))
-                DEQ_REMOVE(router->out_links, item);
-            else
-                DEQ_REMOVE(router->in_links, item);
-            free_dx_link_item_t(item);
-            break;
         }
-        item = item->next;
+    } else {
+        DEQ_REMOVE(router->in_links, rlink);
+        free_dx_router_link_t(rlink);
     }
 
     sys_mutex_unlock(router->lock);
@@ -455,6 +679,83 @@ static void router_inbound_open_handler(
 
 static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
 {
+    // TODO - Make sure this connection is annotated as an inter-router transport.
+    //        Ignore otherwise
+
+    dx_router_t         *router = (dx_router_t*) type_context;
+    dx_field_iterator_t *aiter  = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
+    dx_link_t           *sender;
+    dx_link_t           *receiver;
+    dx_router_link_t    *rlink;
+
+    //
+    // Create an incoming link and put it in the in-links collection.  The address
+    // of the remote source of this link is '_local/qdxrouter'.
+    //
+    receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
+    pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
+    pn_terminus_set_address(dx_link_target(receiver), router_address);
+
+    rlink = new_dx_router_link_t();
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_INCOMING;
+    rlink->link_type      = DX_LINK_ROUTER;
+    rlink->owning_addr    = 0;
+    rlink->link           = receiver;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
+
+    dx_link_set_context(receiver, rlink);
+
+    sys_mutex_lock(router->lock);
+    DEQ_INSERT_TAIL(router->in_links, rlink);
+    sys_mutex_unlock(router->lock);
+
+    //
+    // Create an outgoing link with a local source of '_local/qdxrouter' and place
+    // it in the routing table.
+    //
+    sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
+    pn_terminus_set_address(dx_link_remote_target(sender), router_address);
+    pn_terminus_set_address(dx_link_source(sender), router_address);
+
+    rlink = new_dx_router_link_t();
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_OUTGOING;
+    rlink->link_type      = DX_LINK_ROUTER;
+    rlink->link           = sender;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
+
+    dx_link_set_context(sender, rlink);
+
+    dx_address_t *addr;
+
+    sys_mutex_lock(router->lock);
+    hash_retrieve(router->out_hash, aiter, (void**) &addr);
+    if (!addr) {
+        addr = new_dx_address_t();
+        addr->handler         = 0;
+        addr->handler_context = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
+        hash_insert(router->out_hash, aiter, addr);
+    }
+
+    rlink->owning_addr = addr;
+    DEQ_INSERT_TAIL(addr->rlinks, rlink);
+    sys_mutex_unlock(router->lock);
+
+    pn_link_open(dx_link_pn(receiver));
+    pn_link_open(dx_link_pn(sender));
+    pn_link_flow(dx_link_pn(receiver), 1000);
+    dx_field_iterator_free(aiter);
 }
 
 
@@ -473,7 +774,6 @@ static void dx_router_timer_handler(void
 
 static dx_node_type_t router_node = {"router", 0, 0,
                                      router_rx_handler,
-                                     router_tx_handler,
                                      router_disp_handler,
                                      router_incoming_link_handler,
                                      router_outgoing_link_handler,
@@ -494,22 +794,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx
     }
 
     dx_router_t *router = NEW(dx_router_t);
-    dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
 
+    router_node.type_context = router;
+
+    router->dx           = dx;
+    router->router_area  = area;
+    router->router_id    = id;
+    router->node         = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
     DEQ_INIT(router->in_links);
-    DEQ_INIT(router->out_links);
+    DEQ_INIT(router->routers);
     DEQ_INIT(router->in_fifo);
+    router->lock         = sys_mutex();
+    router->timer        = dx_timer(dx, dx_router_timer_handler, (void*) router);
+    router->out_hash     = hash(10, 32, 0);
+    router->dtag         = 1;
+    router->pyRouter     = 0;
+    router->pyTick       = 0;
 
-    router->dx          = dx;
-    router->lock        = sys_mutex();
-    router->router_area = area;
-    router->router_id   = id;
-
-    router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
-
-    router->out_hash = hash(10, 32, 0);
-    router->dtag     = 1;
-    router->pyRouter = 0;
 
     //
     // Inform the field iterator module of this router's id and area.  The field iterator
@@ -546,47 +847,52 @@ void dx_router_free(dx_router_t *router)
 }
 
 
+const char *dx_router_id(const dx_dispatch_t *dx)
+{
+    dx_router_t *router = dx->router;
+    return router->router_id;
+}
+
+
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
-                                         bool                  is_local,
                                          const char           *address,
                                          dx_router_message_cb  handler,
                                          void                 *context)
 {
-    char                 addr[1000];
-    dx_address_t        *ad = new_dx_address_t();
+    char                 addr_string[1000];
+    dx_router_t         *router = dx->router;
+    dx_address_t        *addr;
     dx_field_iterator_t *iter;
-    int                  result;
 
-    if (!ad)
-        return 0;
+    strcpy(addr_string, "L");  // Local Hash-Key Space
+    strcat(addr_string, address);
+    iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
 
-    ad->is_local        = is_local;
-    ad->handler         = handler;
-    ad->handler_context = context;
-    ad->rlink           = 0;
-
-    if (ad->is_local)
-        strcpy(addr, "L");  // Local Hash-Key Space
-    else
-        strcpy(addr, "M");  // Mobile Hash-Key Space
-
-    strcat(addr, address);
-    iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
-    result = hash_insert(dx->router->out_hash, iter, ad);
-    dx_field_iterator_free(iter);
-    if (result != 0) {
-        free_dx_address_t(ad);
-        return 0;
+    sys_mutex_lock(router->lock);
+    hash_retrieve(router->out_hash, iter, (void**) &addr);
+    if (!addr) {
+        addr = new_dx_address_t();
+        addr->handler         = 0;
+        addr->handler_context = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
+        hash_insert(router->out_hash, iter, addr);
     }
+    dx_field_iterator_free(iter);
+
+    addr->handler         = handler;
+    addr->handler_context = context;
+
+    sys_mutex_unlock(router->lock);
 
     dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
-    return ad;
+    return addr;
 }
 
 
 void dx_router_unregister_address(dx_address_t *ad)
 {
-    free_dx_address_t(ad);
+    //free_dx_address_t(ad);
 }
 
 
@@ -601,12 +907,43 @@ void dx_router_send(dx_dispatch_t       
     sys_mutex_lock(router->lock);
     hash_retrieve(router->out_hash, address, (void*) &addr);
     if (addr) {
-        if (addr->rlink) {
-            pn_link_t    *pn_outlink = dx_link_pn(addr->rlink->link);
-            dx_message_t *copy       = dx_message_copy(msg);
-            DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
-            pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
-            dx_link_activate(addr->rlink->link);
+        //
+        // Forward to all of the local links receiving this address.
+        //
+        dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+        while (dest_link) {
+            dx_routed_event_t *re = new_dx_routed_event_t();
+            DEQ_ITEM_INIT(re);
+            re->delivery    = 0;
+            re->message     = dx_message_copy(msg);
+            re->settle      = 0;
+            re->disposition = 0;
+            DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+            dx_link_activate(dest_link->link);
+            dest_link = DEQ_NEXT(dest_link);
+        }
+
+        //
+        // Forward to the next-hops for remote destinations.
+        //
+        dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+        while (dest_node) {
+            if (dest_node->next_hop)
+                dest_link = dest_node->next_hop->peer_link;
+            else
+                dest_link = dest_node->peer_link;
+            if (dest_link) {
+                dx_routed_event_t *re = new_dx_routed_event_t();
+                DEQ_ITEM_INIT(re);
+                re->delivery    = 0;
+                re->message     = dx_message_copy(msg);
+                re->settle      = 0;
+                re->disposition = 0;
+                DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+                dx_link_activate(dest_link->link);
+            }
+            dest_node = DEQ_NEXT(dest_node);
         }
     }
     sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -633,6 +970,24 @@ typedef struct {
 } RouterAdapter;
 
 
+static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+{
+    //RouterAdapter *adapter = (RouterAdapter*) self;
+    //dx_router_t   *router  = adapter->router;
+    const char    *address;
+    int            is_reachable;
+    int            is_neighbor;
+
+    if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+        return 0;
+
+    // TODO
+
+    Py_INCREF(Py_None);
+    return Py_None;
+}
+
+
 static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
 {
     //RouterAdapter *adapter = (RouterAdapter*) self;
@@ -666,8 +1021,9 @@ static PyObject* dx_router_del_route(PyO
 
 
 static PyMethodDef RouterAdapter_methods[] = {
-    {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
-    {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+    {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
+    {"add_route",    dx_router_add_route,    METH_VARARGS, "Add a newly discovered route"},
+    {"del_route",    dx_router_del_route,    METH_VARARGS, "Delete a route"},
     {0, 0, 0, 0}
 };
 
@@ -747,7 +1103,7 @@ static void dx_router_python_setup(dx_ro
     PyObject* pClass;
     PyObject* pArgs;
 
-    pName   = PyString_FromString("router");
+    pName   = PyString_FromString("qpid.dispatch.router");
     pModule = PyImport_Import(pName);
     Py_DECREF(pName);
     if (!pModule) {
@@ -809,14 +1165,16 @@ static void dx_pyrouter_tick(dx_router_t
     PyObject *pArgs;
     PyObject *pValue;
 
-    pArgs  = PyTuple_New(0);
-    pValue = PyObject_CallObject(router->pyTick, pArgs);
-    if (PyErr_Occurred()) {
-        PyErr_Print();
-    }
-    Py_DECREF(pArgs);
-    if (pValue) {
-        Py_DECREF(pValue);
+    if (router->pyTick) {
+        pArgs  = PyTuple_New(0);
+        pValue = PyObject_CallObject(router->pyTick, pArgs);
+        if (PyErr_Occurred()) {
+            PyErr_Print();
+        }
+        Py_DECREF(pArgs);
+        if (pValue) {
+            Py_DECREF(pValue);
+        }
     }
 }
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/server.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/server.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/server.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *
 }
 
 
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
 {
+    pn_driver_t     *driver   = dx_server->driver;
     pn_listener_t   *listener = pn_driver_listener(driver);
     pn_connector_t  *cxtr;
     dx_connection_t *ctx;
 
     while (listener) {
-        dx_log(module, LOG_TRACE, "Accepting Connection");
         cxtr = pn_listener_accept(listener);
+        dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
         ctx = new_dx_connection_t();
         ctx->state        = CONN_STATE_OPENING;
         ctx->owner_thread = CONTEXT_NO_OWNER;
         ctx->enqueued     = 0;
         ctx->pn_cxtr      = cxtr;
-        ctx->pn_conn      = 0;
         ctx->listener     = (dx_listener_t*) pn_listener_context(listener);
         ctx->connector    = 0;
         ctx->context      = ctx->listener->context;
         ctx->ufd          = 0;
 
+        pn_connection_t *conn = pn_connection();
+        pn_connection_set_container(conn, dx_server->container_name);
+        pn_connector_set_connection(cxtr, conn);
+        pn_connection_set_context(conn, ctx);
+        ctx->pn_conn = conn;
+
         //
         // Get a pointer to the transport so we can insert security components into it
         //
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t
         // Call the handler that is appropriate for the connector's state.
         //
         switch (ctx->state) {
-        case CONN_STATE_CONNECTING:
-            if (!pn_connector_closed(cxtr)) {
-                //ctx->state = CONN_STATE_SASL_CLIENT;
-                assert(ctx->connector);
-                ctx->connector->state = CXTR_STATE_OPEN;
-                events = 1;
-            } else {
+        case CONN_STATE_CONNECTING: {
+            if (pn_connector_closed(cxtr)) {
                 ctx->state = CONN_STATE_FAILED;
                 events = 0;
+                break;
             }
-            break;
-
-        case CONN_STATE_OPENING:
-            ctx->state = CONN_STATE_OPERATIONAL;
 
             pn_connection_t *conn = pn_connection();
             pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t
             pn_connection_set_context(conn, ctx);
             ctx->pn_conn = conn;
 
-            dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+            pn_transport_t           *tport  = pn_connector_transport(cxtr);
+            const dx_server_config_t *config = ctx->connector->config;
 
-            if (ctx->listener) {
-                ce = DX_CONN_EVENT_LISTENER_OPEN;
-            } else if (ctx->connector) {
-                ce = DX_CONN_EVENT_CONNECTOR_OPEN;
-                ctx->connector->delay = 0;
-            } else
-                assert(0);
+            //
+            // Set up SSL if appropriate
+            //
+            if (config->ssl_enabled) {
+                pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+                pn_ssl_domain_set_credentials(domain,
+                                              config->ssl_certificate_file,
+                                              config->ssl_private_key_file,
+                                              config->ssl_password);
+
+                if (config->ssl_require_peer_authentication)
+                    pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+                pn_ssl_t *ssl = pn_ssl(tport);
+                pn_ssl_init(ssl, domain, 0);
+                pn_ssl_domain_free(domain);
+            }
 
-            dx_server->conn_handler(dx_server->conn_handler_context,
-                                    ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+            //
+            // Set up SASL
+            //
+            pn_sasl_t *sasl = pn_sasl(tport);
+            pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+            pn_sasl_client(sasl);
+
+            ctx->state = CONN_STATE_OPENING;
+            assert(ctx->connector);
+            ctx->connector->state = CXTR_STATE_OPEN;
             events = 1;
             break;
+        }
+
+        case CONN_STATE_OPENING: {
+            pn_transport_t *tport = pn_connector_transport(cxtr);
+            pn_sasl_t      *sasl  = pn_sasl(tport);
+
+            if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+                ctx->state = CONN_STATE_OPERATIONAL;
+
+                dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+                if (ctx->listener) {
+                    ce = DX_CONN_EVENT_LISTENER_OPEN;
+                } else if (ctx->connector) {
+                    ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+                    ctx->connector->delay = 0;
+                } else
+                    assert(0);
+
+                dx_server->conn_handler(dx_server->conn_handler_context,
+                                        ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+                events = 1;
+                break;
+            }
+            else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+                ctx->state = CONN_STATE_FAILED;
+                if (ctx->connector) {
+                    const dx_server_config_t *config = ctx->connector->config;
+                    dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+                }
+            }
+        }
 
         case CONN_STATE_OPERATIONAL:
             if (pn_connector_closed(cxtr)) {
@@ -323,20 +372,35 @@ static void *thread_run(void *arg)
         //
         // Service pending timers.
         //
-        dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
-        if (timer) {
-            DEQ_REMOVE_HEAD(dx_server->pending_timers);
-
-            //
-            // Mark the timer as idle in case it reschedules itself.
-            //
-            dx_timer_idle_LH(timer);
+        if (DEQ_SIZE(dx_server->pending_timers) > 0) {
+            dx_timer_list_t local_list;
+            dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+
+            DEQ_INIT(local_list);
+            while (timer) {
+                DEQ_REMOVE_HEAD(dx_server->pending_timers);
+                DEQ_INSERT_TAIL(local_list, timer);
+                timer = DEQ_HEAD(dx_server->pending_timers);
+            }
 
             //
-            // Release the lock and invoke the connection handler.
+            // Release the lock and invoke the connection handlers.
             //
             sys_mutex_unlock(dx_server->lock);
-            timer->handler(timer->context);
+
+            timer = DEQ_HEAD(local_list);
+            while (timer) {
+                DEQ_REMOVE_HEAD(local_list);
+
+                //
+                // Mark the timer as idle in case it reschedules itself.
+                //
+                dx_timer_idle_LH(timer);
+
+                timer->handler(timer->context);
+                timer = DEQ_HEAD(local_list);
+            }
+
             pn_driver_wakeup(dx_server->driver);
             continue;
         }
@@ -411,7 +475,7 @@ static void *thread_run(void *arg)
                 //
                 // Process listeners (incoming connections).
                 //
-                thread_process_listeners(dx_server->driver);
+                thread_process_listeners(dx_server);
 
                 //
                 // Traverse the list of connectors-needing-service from the proton driver.

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/server_private.h Fri Sep 20 18:59:30 2013
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/timer.c Fri Sep 20 18:59:30 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org