You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/25 13:57:15 UTC

qpid-dispatch git commit: DISPATCH-1123 - Completed core-endpoint API, added internal test endpoints enabled from the command line, added tests.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 12692a781 -> d5bbd422c


DISPATCH-1123 - Completed core-endpoint API, added internal test endpoints enabled from the command line, added tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d5bbd422
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d5bbd422
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d5bbd422

Branch: refs/heads/master
Commit: d5bbd422ce040267e72b97ef8a36974dd57d3f4a
Parents: 12692a7
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Sep 25 09:45:12 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 25 09:45:12 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/dispatch.h      |   4 +-
 router/src/main.c                     |  21 +-
 src/CMakeLists.txt                    |   1 +
 src/dispatch.c                        |   3 +-
 src/dispatch_private.h                |   1 +
 src/router_core/connections.c         | 160 +++++------
 src/router_core/core_link_endpoint.c  |  59 ++++-
 src/router_core/core_link_endpoint.h  | 128 +++++----
 src/router_core/core_test_hooks.c     | 408 +++++++++++++++++++++++++++++
 src/router_core/core_test_hooks.h     |  28 ++
 src/router_core/router_core_private.h |  10 +
 src/router_core/router_core_thread.c  |   3 +
 src/router_core/transfer.c            |  19 +-
 tests/CMakeLists.txt                  |   1 +
 tests/run_unit_tests.c                |   2 +-
 tests/system_test.py                  |   4 +-
 tests/system_tests_core_endpoint.py   | 231 ++++++++++++++++
 17 files changed, 942 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/include/qpid/dispatch/dispatch.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/dispatch.h b/include/qpid/dispatch/dispatch.h
index 1e8b360..3b8f79e 100644
--- a/include/qpid/dispatch/dispatch.h
+++ b/include/qpid/dispatch/dispatch.h
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+#include <stdbool.h>
 #include <qpid/dispatch/error.h>
 
 /**@file
@@ -34,9 +35,10 @@ typedef struct qd_dispatch_t qd_dispatch_t;
  * Initialize the Dispatch library and prepare it for operation.
  *
  * @param python_pkgdir The path to the Python files.
+ * @param test_hooks Iff true, enable internal system testing features
  * @return A handle to be used in API calls for this instance.
  */
-qd_dispatch_t *qd_dispatch(const char *python_pkgdir);
+qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks);
 
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/router/src/main.c
----------------------------------------------------------------------
diff --git a/router/src/main.c b/router/src/main.c
index c66ba4f..3b7eecd 100644
--- a/router/src/main.c
+++ b/router/src/main.c
@@ -84,9 +84,9 @@ static void check(int fd) {
         check(fd);                                      \
     } while(false)
 
-static void main_process(const char *config_path, const char *python_pkgdir, int fd)
+static void main_process(const char *config_path, const char *python_pkgdir, bool test_hooks, int fd)
 {
-    dispatch = qd_dispatch(python_pkgdir);
+    dispatch = qd_dispatch(python_pkgdir, test_hooks);
     check(fd);
     log_source = qd_log_source("MAIN"); /* Logging is initialized by qd_dispatch. */
     qd_dispatch_validate_config(config_path);
@@ -123,7 +123,7 @@ static void main_process(const char *config_path, const char *python_pkgdir, int
 }
 
 
-static void daemon_process(const char *config_path, const char *python_pkgdir,
+static void daemon_process(const char *config_path, const char *python_pkgdir, bool test_hooks,
                            const char *pidfile, const char *user)
 {
     int pipefd[2];
@@ -243,7 +243,7 @@ static void daemon_process(const char *config_path, const char *python_pkgdir,
                 //if (setgid(pwd->pw_gid) < 0) fail(pipefd[1], "Can't set group ID for user %s, errno=%d", user, errno);
             }
 
-            main_process((config_path_full ? config_path_full : config_path), python_pkgdir, pipefd[1]);
+            main_process((config_path_full ? config_path_full : config_path), python_pkgdir, test_hooks, pipefd[1]);
 
             free(config_path_full);
         } else
@@ -283,6 +283,7 @@ void usage(char **argv) {
     fprintf(stdout, "  -d, --daemon               Run process as a SysV-style daemon\n");
     fprintf(stdout, "  -P, --pidfile              If daemon, the file for the stored daemon pid\n");
     fprintf(stdout, "  -U, --user                 If daemon, the username to run as\n");
+    fprintf(stdout, "  -T, --test-hooks           Enable internal system testing features\n");
     fprintf(stdout, "  -v, --version              Print the version of Qpid Dispatch Router\n");
     fprintf(stdout, "  -h, --help                 Print this help\n");
 }
@@ -295,6 +296,7 @@ int main(int argc, char **argv)
     const char *pidfile = 0;
     const char *user    = 0;
     bool        daemon_mode = false;
+    bool        test_hooks  = false;
 
     static struct option long_options[] = {
     {"config",  required_argument, 0, 'c'},
@@ -304,11 +306,12 @@ int main(int argc, char **argv)
     {"user",    required_argument, 0, 'U'},
     {"help",    no_argument,       0, 'h'},
     {"version", no_argument,       0, 'v'},
+    {"test-hooks", no_argument,    0, 'T'},
     {0,         0,                 0,  0}
     };
 
     while (1) {
-        int c = getopt_long(argc, argv, "c:I:dP:U:h:v", long_options, 0);
+        int c = getopt_long(argc, argv, "c:I:dP:U:h:vT", long_options, 0);
         if (c == -1)
             break;
 
@@ -341,6 +344,10 @@ int main(int argc, char **argv)
             fprintf(stdout, "%s\n", QPID_DISPATCH_VERSION);
             exit(0);
 
+        case 'T' :
+            test_hooks = true;
+            break;
+
         case '?' :
             usage(argv);
             exit(1);
@@ -355,9 +362,9 @@ int main(int argc, char **argv)
     }
 
     if (daemon_mode)
-        daemon_process(config_path, python_pkgdir, pidfile, user);
+        daemon_process(config_path, python_pkgdir, test_hooks, pidfile, user);
     else
-        main_process(config_path, python_pkgdir, 2);
+        main_process(config_path, python_pkgdir, test_hooks, 2);
 
     return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c533e9f..c3d35b9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -85,6 +85,7 @@ set(qpid_dispatch_SOURCES
   router_core/agent_router.c
   router_core/connections.c
   router_core/core_link_endpoint.c
+  router_core/core_test_hooks.c
   router_core/edge_control.c
   router_core/error.c
   router_core/exchange_bindings.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 2966bf3..9fb2186 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -58,7 +58,7 @@ const char     *MULTICAST_DISTRIBUTION = "multicast";
 const char     *BALANCED_DISTRIBUTION  = "balanced";
 const char     *UNAVAILABLE_DISTRIBUTION = "unavailable";
 
-qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
+qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks)
 {
     qd_dispatch_t *qd = NEW(qd_dispatch_t);
     ZERO(qd);
@@ -84,6 +84,7 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
     qd_dispatch_set_router_id(qd, strdup("0"));
     qd->router_mode = QD_ROUTER_MODE_ENDPOINT;
     qd->default_treatment   = QD_TREATMENT_LINK_BALANCED;
+    qd->test_hooks          = test_hooks;
 
     qd_python_initialize(qd, python_pkgdir);
     if (qd_error_code()) { qd_dispatch_free(qd); return 0; }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index d927023..36065dc 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -57,6 +57,7 @@ struct qd_dispatch_t {
     char  *router_id;
     qd_router_mode_t  router_mode;
     bool   allow_resumable_link_route;
+    bool   test_hooks;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9df65d1..d51dd39 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -816,6 +816,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     DEQ_REMOVE(core->open_links, link);
 
     //
+    // If the link has a core_endpoint, allow the core_endpoint module to
+    // clean up its state
+    //
+    if (link->core_endpoint)
+        qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);
+
+    //
     // If the link has a connected peer, unlink the peer
     //
     if (link->connected_link) {
@@ -1890,96 +1897,101 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
     //
     link->detach_count++;
 
-    //
-    // For routed links, propagate the detach
-    //
-    if (link->connected_link) {
+    if (link->core_endpoint) {
+        qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
+
+    } else {
         //
-        // If the connected link is outgoing and there is a delivery on the connected link's undelivered
-        // list that is not receive-complete, we must flag that delivery as aborted or it will forever
-        // block the propagation of the detach.
+        // For routed links, propagate the detach
         //
-        if (link->connected_link->link_direction == QD_OUTGOING)
-            qdr_link_abort_undelivered_CT(core, link->connected_link);
+        if (link->connected_link) {
+            //
+            // If the connected link is outgoing and there is a delivery on the connected link's undelivered
+            // list that is not receive-complete, we must flag that delivery as aborted or it will forever
+            // block the propagation of the detach.
+            //
+            if (link->connected_link->link_direction == QD_OUTGOING)
+                qdr_link_abort_undelivered_CT(core, link->connected_link);
 
-        if (dt != QD_LOST)
-            qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
-        else {
-            qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
-            qdr_error_free(error);
+            if (dt != QD_LOST)
+                qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
+            else {
+                qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
+                qdr_error_free(error);
+            }
+
+            //
+            // If the link is completely detached, release its resources
+            //
+            if (link->detach_count == 2) {
+                qdr_link_cleanup_CT(core, conn, link);
+                free_qdr_link_t(link);
+            }
+
+            return;
         }
 
         //
-        // If the link is completely detached, release its resources
+        // For auto links, switch the auto link to failed state and record the error
         //
-        if (link->detach_count == 2) {
-            qdr_link_cleanup_CT(core, conn, link);
-            free_qdr_link_t(link);
-        }
+        if (link->auto_link) {
+            link->auto_link->link  = 0;
+            link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
+            free(link->auto_link->last_error);
+            link->auto_link->last_error = qdr_error_description(error);
 
-        return;
-    }
+            //
+            // The auto link has failed. Periodically retry setting up the auto link until
+            // it succeeds.
+            //
+            qdr_route_auto_link_detached_CT(core, link);
+        }
 
-    //
-    // For auto links, switch the auto link to failed state and record the error
-    //
-    if (link->auto_link) {
-        link->auto_link->link  = 0;
-        link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
-        free(link->auto_link->last_error);
-        link->auto_link->last_error = qdr_error_description(error);
+        if (link->link_direction == QD_INCOMING) {
+            //
+            // Handle incoming link cases
+            //
+            switch (link->link_type) {
+            case QD_LINK_ENDPOINT:
+                if (addr) {
+                    //
+                    // Drain the undelivered list to ensure deliveries don't get dropped by a detach.
+                    //
+                    qdr_drain_inbound_undelivered_CT(core, link, addr);
 
-        //
-        // The auto link has failed. Periodically retry setting up the auto link until
-        // it succeeds.
-        //
-        qdr_route_auto_link_detached_CT(core, link);
-    }
+                    //
+                    // Remove this link from the linked address.
+                    //
+                    qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+                }
+                break;
 
-    if (link->link_direction == QD_INCOMING) {
-        //
-        // Handle incoming link cases
-        //
-        switch (link->link_type) {
-        case QD_LINK_ENDPOINT:
-            if (addr) {
-                //
-                // Drain the undelivered list to ensure deliveries don't get dropped by a detach.
-                //
-                qdr_drain_inbound_undelivered_CT(core, link, addr);
+            case QD_LINK_CONTROL:
+                break;
 
-                //
-                // Remove this link from the linked address.
-                //
-                qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+            case QD_LINK_ROUTER:
+                break;
             }
-            break;
+        } else {
+            //
+            // Handle outgoing link cases
+            //
+            switch (link->link_type) {
+            case QD_LINK_ENDPOINT:
+                if (addr) {
+                    qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+                    was_local = true;
+                }
+                break;
 
-        case QD_LINK_CONTROL:
-            break;
+            case QD_LINK_CONTROL:
+                qdr_detach_link_control_CT(core, conn, link);
+                break;
 
-        case QD_LINK_ROUTER:
-            break;
-        }
-    } else {
-        //
-        // Handle outgoing link cases
-        //
-        switch (link->link_type) {
-        case QD_LINK_ENDPOINT:
-            if (addr) {
-                qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
-                was_local = true;
+            case QD_LINK_ROUTER:
+                qdr_detach_link_data_CT(core, conn, link);
+                break;
             }
-            break;
-
-        case QD_LINK_CONTROL:
-            qdr_detach_link_control_CT(core, conn, link);
-            break;
-
-        case QD_LINK_ROUTER:
-            qdr_detach_link_data_CT(core, conn, link);
-            break;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_link_endpoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index b8b6edc..961c219 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -19,6 +19,7 @@
 
 #include "core_link_endpoint.h"
 #include "qpid/dispatch/alloc.h"
+#include <stdio.h>
 
 struct qdrc_endpoint_t {
     qdrc_endpoint_desc_t *desc;
@@ -40,7 +41,7 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t           *core,
     qd_iterator_annotate_phase(iter, phase);
 
     qd_hash_retrieve(core->addr_hash, iter, (void*) &addr);
-    if (addr) {
+    if (!addr) {
         qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0);
         if (treatment == QD_TREATMENT_UNAVAILABLE)
             treatment = QD_TREATMENT_ANYCAST_BALANCED;
@@ -110,6 +111,21 @@ void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t
 }
 
 
+qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qd_message_t *message)
+{
+    qdr_delivery_t *dlv = new_qdr_delivery_t();
+    uint64_t       *tag = (uint64_t*) dlv->tag;
+
+    ZERO(dlv);
+    dlv->link           = endpoint->link;
+    dlv->msg            = message;
+    *tag                = core->next_tag++;
+    dlv->tag_length = 8;
+    dlv->ingress_index = -1;
+    return dlv;
+}
+
+
 void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t disposition)
 {
     //
@@ -133,6 +149,10 @@ void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t dis
 void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
 {
     qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
+    if (ep->link->detach_count == 2) {
+        ep->link->core_endpoint = 0;
+        free_qdrc_endpoint_t(ep);
+    }
 }
 
 
@@ -143,13 +163,48 @@ bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr
     ep->desc = addr->core_endpoint;
     ep->link = link;
 
+    link->core_endpoint = ep;
+
     *error = 0;
     bool accept = !!ep->desc->on_first_attach ?
         ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, error) : false;
 
-    if (!accept)
+    if (!accept) {
+        link->core_endpoint = 0;
         free_qdrc_endpoint_t(ep);
+    }
 
     return accept;
 }
 
+
+
+void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv)
+{
+    ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
+}
+
+
+void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
+{
+    ep->desc->on_flow(ep->link_context, credit, drain);
+}
+
+
+void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
+{
+    ep->desc->on_detach(ep->link_context, error);
+    if (ep->link->detach_count == 2) {
+        ep->link->core_endpoint = 0;
+        free_qdrc_endpoint_t(ep);
+    }
+}
+
+
+void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *ep)
+{
+    ep->desc->on_cleanup(ep->link_context);
+    free_qdrc_endpoint_t(ep);
+}
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_link_endpoint.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h
index 1bfbb3c..4e9c2a0 100644
--- a/src/router_core/core_link_endpoint.h
+++ b/src/router_core/core_link_endpoint.h
@@ -32,12 +32,12 @@ typedef struct qdrc_endpoint_t qdrc_endpoint_t;
 /**
  * Event - An attach for a new core-endpoint link has arrived
  *
- * @Param bind_context The opaque context provided in the mobile address binding
- * @Param endpoint A new endpoint object for the link.  If the link is accepted, this
+ * @param bind_context The opaque context provided in the mobile address binding
+ * @param endpoint A new endpoint object for the link.  If the link is accepted, this
  *                 object must be stored for later use.
- * @Param link_context [out] Handler-provided opaque context to be associated with the endpoint
- * @Param error [out] Error indication which may be supplied if the link is rejected
- * @Return True if the link is to be accepted, False if the link should be rejected and detached
+ * @param link_context [out] Handler-provided opaque context to be associated with the endpoint
+ * @param error [out] Error indication which may be supplied if the link is rejected
+ * @return True if the link is to be accepted, False if the link should be rejected and detached
  */
 typedef bool (*qdrc_first_attach_t) (void             *bind_context,
                                      qdrc_endpoint_t  *endpoint,
@@ -51,16 +51,20 @@ typedef bool (*qdrc_first_attach_t) (void             *bind_context,
  * is the responsibility of the core-endpoint to supply credit at the appropriate time
  * by calling qdrc_endpoint_flow_CT.
  *
- * @Param link_context The opaque context supplied in the call to qdrc_endpoint_create_link_CT
+ * @param link_context The opaque context supplied in the call to qdrc_endpoint_create_link_CT
+ * @param remote_source Pointer to the remote source terminus of the link
+ * @param remote_target Pointer to the remote target terminus of the link
  */
-typedef bool (*qdrc_second_attach_t) (void *link_context);
+typedef void (*qdrc_second_attach_t) (void           *link_context,
+                                      qdr_terminus_t *remote_source,
+                                      qdr_terminus_t *remote_target);
 
 /**
  * Event - Credit/Drain status for an outgoing core-endpoint link has changed
  *
- * @Param link_context The opaque context associated with the endpoint link
- * @Param available_credit The number of deliveries that may be sent on this link
- * @Param drain True iff the peer receiver is requesting that the credit be drained
+ * @param link_context The opaque context associated with the endpoint link
+ * @param available_credit The number of deliveries that may be sent on this link
+ * @param drain True iff the peer receiver is requesting that the credit be drained
  */
 typedef void (*qdrc_flow_t) (void *link_context,
                              int   available_credit,
@@ -69,10 +73,10 @@ typedef void (*qdrc_flow_t) (void *link_context,
 /**
  * Event - The settlement and/or disposition of a delivery has been updated
  *
- * @Param link_context The opaque context associated with the endpoint link
- * @Param delivery The delivery object experiencing the change
- * @Param settled True iff the delivery has been settled by the peer
- * @Param disposition The disposition of the delivery (PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
+ * @param link_context The opaque context associated with the endpoint link
+ * @param delivery The delivery object experiencing the change
+ * @param settled True iff the delivery has been settled by the peer
+ * @param disposition The disposition of the delivery (PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
  */
 typedef void (*qdrc_update_t) (void           *link_context,
                                qdr_delivery_t *delivery,
@@ -86,9 +90,9 @@ typedef void (*qdrc_update_t) (void           *link_context,
  * required, this handler _must_ use the qd_message_receive_complete method on the
  * message to ensure the message has been completely received.
  *
- * @Param link_context The opaque context associated with the endpoint link
- * @Param delivery Pointer to the delivery object for the transfer
- * @Param message Pointer to the message being transferred
+ * @param link_context The opaque context associated with the endpoint link
+ * @param delivery Pointer to the delivery object for the transfer
+ * @param message Pointer to the message being transferred
  */
 typedef void (*qdrc_transfer_t) (void           *link_context,
                                  qdr_delivery_t *delivery,
@@ -97,13 +101,26 @@ typedef void (*qdrc_transfer_t) (void           *link_context,
 /**
  * Event - A core-endpoint link has been detached
  *
- * @Param link_context The opaque context associated with the endpoint link
- * @Param error The error information that came with the detach or 0 if no error
+ * Note: It is safe to discard objects referenced by the link_context in this handler.
+ *       There will be no further references to this link_context returned after this call.
+ *
+ * @param link_context The opaque context associated with the endpoint link
+ * @param error The error information that came with the detach or 0 if no error
  */
 typedef void (*qdrc_detach_t) (void        *link_context,
                                qdr_error_t *error);
 
 
+/**
+ * Event - A core-endpoint link has been cleaned up (not cleanly detached)
+ *
+ * This handler must free all resources associated with the link-context.
+ *
+ * @param link_context The opaque context associated with the endpoint link
+ */
+typedef void (*qdrc_cleanup_t) (void *link_context);
+
+
 typedef struct qdrc_endpoint_desc_t {
     qdrc_first_attach_t   on_first_attach;
     qdrc_second_attach_t  on_second_attach;
@@ -111,6 +128,7 @@ typedef struct qdrc_endpoint_desc_t {
     qdrc_update_t         on_update;
     qdrc_transfer_t       on_transfer;
     qdrc_detach_t         on_detach;
+    qdrc_cleanup_t        on_cleanup;
 } qdrc_endpoint_desc_t;
 
 
@@ -121,11 +139,11 @@ typedef struct qdrc_endpoint_desc_t {
  * to the core-endpoint.  Any incoming links with terminus addresses that match
  * this address will be directed to the core-endpoint for handling.
  *
- * @Param core Pointer to the core object
- * @Param address The address as a null-terminated character string
- * @Param phase The phase of the address (typically '0')
- * @Param desc The descriptor for this core endpoint containing all callbacks
- * @Param bind_context An opaque context that will be included in the call to on_first_attach
+ * @param core Pointer to the core object
+ * @param address The address as a null-terminated character string
+ * @param phase The phase of the address (typically '0')
+ * @param desc The descriptor for this core endpoint containing all callbacks
+ * @param bind_context An opaque context that will be included in the call to on_first_attach
  */
 void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t           *core,
                                           const char           *address,
@@ -140,14 +158,14 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t           *core,
  * Initiate the attachment of a new link outbound to a remote node.  The link will
  * be known to be fully attached when the on_second_attach callback is invoked.
  *
- * @Param core Pointer to the core object
- * @Param conn Pointer to the connection object over which the link will be created
- * @Param dir The direction of the link: QD_INCOMING or QD_OUTGOING
- * @Param local_source The source terminus of the link - must be included for incoming links
- * @Param local_target The target terminus of the link - must be included for outgoing links
- * @Param desc The descriptor for this core endpoint containing all callbacks
- * @Param link_context An opaque context that will be included in the calls to the callbacks
- * @Return Pointer to a new qdrc_endpoint_t for tracking the link
+ * @param core Pointer to the core object
+ * @param conn Pointer to the connection object over which the link will be created
+ * @param dir The direction of the link: QD_INCOMING or QD_OUTGOING
+ * @param local_source The source terminus of the link - must be included for incoming links
+ * @param local_target The target terminus of the link - must be included for outgoing links
+ * @param desc The descriptor for this core endpoint containing all callbacks
+ * @param link_context An opaque context that will be included in the calls to the callbacks
+ * @return Pointer to a new qdrc_endpoint_t for tracking the link
  */
 qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t           *core,
                                               qdr_connection_t     *conn,
@@ -163,8 +181,8 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t           *core,
  *  - The link's direction of delivery flow
  *  - The link's connection
  *
- * @Param endpoint Pointer to an endpoint object
- * @Return The requested information (or 0 if not present)
+ * @param endpoint Pointer to an endpoint object
+ * @return The requested information (or 0 if not present)
  */
 qd_direction_t    qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint);
 qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
@@ -172,40 +190,50 @@ qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
 /**
  * Issue credit and control drain for an incoming link
  *
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param credit_added Number of credits to grant to the sender
- * @Param drain Indication that you want the sender to drain available credit
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param credit_added Number of credits to grant to the sender
+ * @param drain Indication that you want the sender to drain available credit
  */
 void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit_added, bool drain);
 
 /**
  * Send a message via an outgoing link
  *
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param delivery A delivery containing a message that is to be sent on the link
- * @Param presettled True iff the delivery is to be presettled.  If presettled, no further action
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param delivery A delivery containing a message that is to be sent on the link
+ * @param presettled True iff the delivery is to be presettled.  If presettled, no further action
  *                   will be needed for the delivery.  If not presettled, an on_update event for
  *                   the delivery should be expected.
  */
 void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery, bool presettled);
 
 /**
- * Settle a received delivery with a specified disposition
+ * Allocate a delivery from a given endpoint
  *
  * @Param core Pointer to the core object
- * @Param delivery Pointer to a received delivery
- * @Param disposition The desired disposision of the delivery (use PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
+ * @Param endpoint Pointer to an endpoint object
+ * @Param message An outgoing message to be associated with the delivery
+ * @Return A delivery that can be passed to qdrc_endpoint_send_CT
+ */
+qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qd_message_t *message);
+
+/**
+ * Settle a received delivery with a specified disposition
+ *
+ * @param core Pointer to the core object
+ * @param delivery Pointer to a received delivery
+ * @param disposition The desired disposision of the delivery (use PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
  */
 void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition);
 
 /**
  * Detach a link attached to the core-endpoint
  *
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param error An error indication or 0 for no error
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
  */
 void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
 
@@ -215,5 +243,9 @@ void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_er
 //=====================================================================================
 
 bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error);
+void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery);
+void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain);
+void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
+void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.c b/src/router_core/core_test_hooks.c
new file mode 100644
index 0000000..05cb75d
--- /dev/null
+++ b/src/router_core/core_test_hooks.c
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/message.h"
+#include "qpid/dispatch/compose.h"
+#include "core_test_hooks.h"
+#include "core_link_endpoint.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+typedef enum {
+    TEST_NODE_ECHO,
+    TEST_NODE_DENY,
+    TEST_NODE_SINK,
+    TEST_NODE_SOURCE,
+    TEST_NODE_SOURCE_PS,
+    TEST_NODE_DISCARD
+} test_node_behavior_t;
+
+typedef struct test_node_t test_node_t;
+
+typedef struct test_endpoint_t {
+    DEQ_LINKS(struct test_endpoint_t);
+    test_node_t         *node;
+    qdrc_endpoint_t     *ep;
+    qdr_delivery_list_t  deliveries;
+    int                  credit;
+    bool                 in_action_list;
+    bool                 detached;
+} test_endpoint_t;
+
+DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
+
+struct test_node_t {
+    qdr_core_t           *core;
+    test_node_behavior_t  behavior;
+    qdrc_endpoint_desc_t *desc;
+    test_endpoint_list_t  in_links;
+    test_endpoint_list_t  out_links;
+};
+
+static test_node_t *echo_node;
+static test_node_t *deny_node;
+static test_node_t *sink_node;
+static test_node_t *source_node;
+static test_node_t *source_ps_node;
+static test_node_t *discard_node;
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+static void source_send(test_endpoint_t *ep, bool presettled)
+{
+    static uint32_t      sequence = 0;
+    static char          stringbuf[100];
+    qdr_delivery_t      *dlv;
+    qd_message_t        *msg   = qd_message();
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+
+    sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
+
+    qd_compose_start_list(field);
+    qd_compose_insert_bool(field, 0);     // durable
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+    qd_compose_start_list(field);
+    qd_compose_insert_null(field);        // message-id
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+    qd_compose_start_map(field);
+    qd_compose_insert_symbol(field, "sequence");
+    qd_compose_insert_uint(field, sequence++);
+    qd_compose_end_map(field);
+
+    field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    qd_compose_insert_string(field, stringbuf);
+
+    dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
+    qd_message_compose_2(msg, field);
+    qd_compose_free(field);
+    qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
+
+    if (--ep->credit > 0) {
+        qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
+        action->args.general.context_1 = (void*) ep;
+        ep->in_action_list = true;
+        qdr_action_enqueue(ep->node->core, action);
+    }
+}
+
+
+static void free_endpoint(test_endpoint_t *ep)
+{
+    test_node_t *node = ep->node;
+
+    if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+        DEQ_REMOVE(node->in_links, ep);
+    else
+        DEQ_REMOVE(node->out_links, ep);
+    free(ep);
+}
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
+
+    ep->in_action_list = false;
+    if (ep->detached) {
+        free_endpoint(ep);
+        return;
+    }
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SINK :
+    case TEST_NODE_DISCARD :
+
+    case TEST_NODE_SOURCE :
+        source_send(ep, false);
+        break;
+
+    case TEST_NODE_SOURCE_PS :
+        source_send(ep, true);
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+    }
+}
+
+
+static bool first_attach(void             *bind_context,
+                         qdrc_endpoint_t  *endpoint,
+                         void            **link_context,
+                         qdr_error_t     **error)
+{
+    test_node_t     *node     = (test_node_t*) bind_context;
+    test_endpoint_t *test_ep  = 0;
+    bool             incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+
+    switch (node->behavior) {
+    case TEST_NODE_DENY :
+        *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+        return false;
+
+    case TEST_NODE_ECHO :
+        break;
+
+    case TEST_NODE_SINK :
+        if (incoming) {
+            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+        } else {
+            *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
+            return false;
+        }
+        break;
+
+    case TEST_NODE_SOURCE :
+    case TEST_NODE_SOURCE_PS :
+        if (incoming) {
+            *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
+            return false;
+        }
+        break;
+
+    case TEST_NODE_DISCARD :
+        if (incoming) {
+            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+        } else {
+            *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
+            return false;
+        }
+        break;
+    }
+
+    test_ep = NEW(test_endpoint_t);
+    ZERO(test_ep);
+    test_ep->node = node;
+    test_ep->ep   = endpoint;
+    *link_context = test_ep;
+
+    if (incoming)
+        DEQ_INSERT_TAIL(node->in_links, test_ep);
+    else
+        DEQ_INSERT_TAIL(node->out_links, test_ep);
+
+    return true;
+}
+
+
+static void second_attach(void           *link_context,
+                          qdr_terminus_t *remote_source,
+                          qdr_terminus_t *remote_target)
+{
+}
+
+
+static void flow(void *link_context,
+                 int   available_credit,
+                 bool  drain)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+    if (available_credit == 0)
+        return;
+
+    ep->credit = available_credit;
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SINK :
+    case TEST_NODE_DISCARD :
+        break;
+
+    case TEST_NODE_SOURCE :
+        source_send(ep, false);
+        break;
+
+    case TEST_NODE_SOURCE_PS :
+        source_send(ep, true);
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+    }
+}
+
+
+static void update(void           *link_context,
+                   qdr_delivery_t *delivery,
+                   bool            settled,
+                   uint64_t        disposition)
+{
+}
+
+
+static void transfer(void           *link_context,
+                     qdr_delivery_t *delivery,
+                     qd_message_t   *message)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (!qd_message_receive_complete(message))
+        return;
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SOURCE :
+    case TEST_NODE_SOURCE_PS :
+        assert(false); // Can't get here.  Link should not have been opened
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+
+    case TEST_NODE_SINK :
+        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
+        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+        break;
+
+    case TEST_NODE_DISCARD :
+        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
+        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+        break;
+    }
+}
+
+
+static void detach(void        *link_context,
+                   qdr_error_t *error)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (ep->in_action_list) {
+        ep->detached = true;
+    } else {
+        free_endpoint(ep);
+    }
+}
+
+
+static void cleanup(void *link_context)
+{
+}
+
+
+static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+
+
+static void qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
+{
+    char *echo_address       = "org.apache.qpid.dispatch.router/test/echo";
+    char *deny_address       = "org.apache.qpid.dispatch.router/test/deny";
+    char *sink_address       = "org.apache.qpid.dispatch.router/test/sink";
+    char *source_address     = "org.apache.qpid.dispatch.router/test/source";
+    char *source_ps_address  = "org.apache.qpid.dispatch.router/test/source_ps";
+    char *discard_address    = "org.apache.qpid.dispatch.router/test/discard";
+
+    echo_node       = NEW(test_node_t);
+    deny_node       = NEW(test_node_t);
+    sink_node       = NEW(test_node_t);
+    source_node     = NEW(test_node_t);
+    source_ps_node  = NEW(test_node_t);
+    discard_node    = NEW(test_node_t);
+
+    echo_node->core     = core;
+    echo_node->behavior = TEST_NODE_ECHO;
+    echo_node->desc     = &descriptor;
+    DEQ_INIT(echo_node->in_links);
+    DEQ_INIT(echo_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, echo_node);
+
+    deny_node->core     = core;
+    deny_node->behavior = TEST_NODE_DENY;
+    deny_node->desc     = &descriptor;
+    DEQ_INIT(deny_node->in_links);
+    DEQ_INIT(deny_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, deny_node);
+
+    sink_node->core     = core;
+    sink_node->behavior = TEST_NODE_SINK;
+    sink_node->desc     = &descriptor;
+    DEQ_INIT(sink_node->in_links);
+    DEQ_INIT(sink_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, sink_node);
+
+    source_node->core     = core;
+    source_node->behavior = TEST_NODE_SOURCE;
+    source_node->desc     = &descriptor;
+    DEQ_INIT(source_node->in_links);
+    DEQ_INIT(source_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, source_node);
+
+    source_ps_node->core     = core;
+    source_ps_node->behavior = TEST_NODE_SOURCE_PS;
+    source_ps_node->desc     = &descriptor;
+    DEQ_INIT(source_ps_node->in_links);
+    DEQ_INIT(source_ps_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, source_ps_node);
+
+    discard_node->core     = core;
+    discard_node->behavior = TEST_NODE_DISCARD;
+    discard_node->desc     = &descriptor;
+    DEQ_INIT(discard_node->in_links);
+    DEQ_INIT(discard_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, discard_node);
+}
+
+
+static void qdrc_test_hooks_core_endpoint_finalize(qdr_core_t *core)
+{
+    free(echo_node);
+    free(deny_node);
+    free(sink_node);
+    free(source_node);
+    free(source_ps_node);
+    free(discard_node);
+}
+
+
+void qdrc_test_hooks_init_CT(qdr_core_t *core)
+{
+    //
+    // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+    //
+    if (!core->qd->test_hooks)
+        return;
+
+    qd_log(core->log, QD_LOG_INFO, "Core thread system test hooks enabled");
+
+    qdrc_test_hooks_core_endpoint_setup(core);
+}
+
+
+void qdrc_test_hooks_final_CT(qdr_core_t *core)
+{
+    //
+    // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+    //
+    if (!core->qd->test_hooks)
+        return;
+
+    qdrc_test_hooks_core_endpoint_finalize(core);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_test_hooks.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.h b/src/router_core/core_test_hooks.h
new file mode 100644
index 0000000..7be20f3
--- /dev/null
+++ b/src/router_core/core_test_hooks.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_test_hooks
+#define qd_router_core_test_hooks 1
+
+#include "router_core_private.h"
+
+void qdrc_test_hooks_init_CT(qdr_core_t *core);
+void qdrc_test_hooks_final_CT(qdr_core_t *core);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 13c7dc4..1a4450d 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -154,6 +154,16 @@ struct qdr_action_t {
             qd_buffer_list_t         body_buffers;
         } agent;
 
+        //
+        // Arguments for general use
+        //
+        struct {
+            void *context_1;
+            void *context_2;
+            void *context_3;
+            void *context_4;
+        } general;
+
     } args;
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index e5b00f5..ae6bbdc 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -18,6 +18,7 @@
  */
 
 #include "router_core_private.h"
+#include "core_test_hooks.h"
 
 /**
  * Creates a thread that is dedicated to managing and using the routing table.
@@ -51,6 +52,7 @@ void *router_core_thread(void *arg)
     qdr_forwarder_setup_CT(core);
     qdr_route_table_setup_CT(core);
     qdr_agent_setup_CT(core);
+    qdrc_test_hooks_init_CT(core);
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
     while (core->running) {
@@ -91,6 +93,7 @@ void *router_core_thread(void *arg)
         qdr_activate_connections_CT(core);
     }
 
+    qdrc_test_hooks_final_CT(core);
     qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
     return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 68645ea..361e2e1 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -748,11 +748,13 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
         activate = true;
     }
 
-    //
-    // If this is an attach-routed link, propagate the flow data downrange.
-    // Note that the credit value is incremental.
-    //
-    if (link->connected_link) {
+    if (link->core_endpoint) {
+        qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain);
+    } else if (link->connected_link) {
+        //
+        // If this is an attach-routed link, propagate the flow data downrange.
+        // Note that the credit value is incremental.
+        //
         qdr_link_t *clink = link->connected_link;
 
         if (clink->link_direction == QD_INCOMING)
@@ -983,6 +985,13 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
 
     bool more = action->args.connection.more;
 
+    //
+    // If this link has a core_endpoint, direct deliveries to that endpoint.
+    //
+    if (!!link->core_endpoint) {
+        qdrc_endpoint_do_deliver_CT(core, link->core_endpoint, dlv);
+        return;
+    }
 
     if (link->connected_link) {
         if (link->link_direction == QD_INCOMING)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 4c2cf11..c54e9d4 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -118,6 +118,7 @@ foreach(py_test_module
     system_tests_bad_configuration
     system_tests_ssl
     system_tests_edge_router
+    system_tests_core_endpoint
     ${SYSTEM_TESTS_HTTP}
     ${CONSOLE_TEST}
     )

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index 3434a90..77b9845 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -41,7 +41,7 @@ int main(int argc, char** argv)
     int result = 0;
 
     // Call qd_dispatch() first initialize allocator used by other tests.
-    qd_dispatch_t *qd = qd_dispatch(0);
+    qd_dispatch_t *qd = qd_dispatch(0, false);
 
     qd_dispatch_validate_config(argv[1]);
     if (qd_error_code()) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index 5cff90c..7794d61 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -325,7 +325,7 @@ class Qdrouterd(Process):
             self.defaults()
             return "".join(["%s {\n%s}\n"%(n, props(p, 1)) for n, p in self])
 
-    def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True):
+    def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True, cl_args=[]):
         """
         @param name: name used for for output files, default to id from config.
         @param config: router configuration
@@ -339,7 +339,7 @@ class Qdrouterd(Process):
         if not default_log:
             config.append(
                 ('log', {'module':'DEFAULT', 'enable':'trace+', 'includeSource': 'true', 'outputFile':name+'.log'}))
-        args = ['qdrouterd', '-c', config.write(name)]
+        args = ['qdrouterd', '-c', config.write(name)] + cl_args
         env_home = os.environ.get('QPID_DISPATCH_HOME')
         if pyinclude:
             args += ['-I', pyinclude]

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/system_tests_core_endpoint.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_endpoint.py b/tests/system_tests_core_endpoint.py
new file mode 100644
index 0000000..14f4fbf
--- /dev/null
+++ b/tests/system_tests_core_endpoint.py
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest2 as unittest
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch_internal.compat import UNICODE
+from qpid_dispatch.management.client import Node
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, connection, args=[]):
+
+            config = [
+                ('router', {'mode': 'interior', 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+                ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+                connection
+            ]
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args))
+
+        cls.routers = []
+        inter_router_port = cls.tester.get_port()
+
+        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}), ["-T"])
+
+
+    def test_01_denied_link(self):
+        test = DenyLinkTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/deny")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_discard_deliveries(self):
+        test = DiscardTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/discard")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_presettled_source(self):
+        test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source_ps", 300, 300)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_04_unsettled_source(self):
+        test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source", 300, 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class DenyLinkTest(MessagingHandler):
+    def __init__(self, host, address):
+        super(DenyLinkTest, self).__init__(prefetch = 0)
+        self.host      = host
+        self.address   = address
+
+        self.conn     = None
+        self.error    = None
+        self.receiver = None
+        self.sender   = None
+        self.receiver_failed = False
+        self.sender_failed   = False
+
+    def timeout(self):
+        self.error = "Timeout Expired: receiver_failed=%s sender_failed=%s" %\
+                     ("yes" if self.receiver_failed else "no",
+                      "yes" if self.sender_failed else "no")
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(5.0, Timeout(self))
+        self.conn     = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
+        self.sender   = event.container.create_sender(self.conn, self.address)
+
+    def on_link_error(self, event):
+        if event.receiver == self.receiver:
+            self.receiver_failed = True
+        if event.sender == self.sender:
+            self.sender_failed = True
+
+        if self.receiver_failed and self.sender_failed:
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class DiscardTest(MessagingHandler):
+    def __init__(self, host, address):
+        super(DiscardTest, self).__init__(prefetch = 0)
+        self.host      = host
+        self.address   = address
+
+        self.conn     = None
+        self.error    = None
+        self.sender   = None
+
+        self.count    = 300
+        self.sent     = 0
+        self.rejected = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rejected=%d" % (self.sent, self.rejected)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer  = event.reactor.schedule(5.0, Timeout(self))
+        self.conn   = event.container.connect(self.host)
+        self.sender = event.container.create_sender(self.conn, self.address)
+
+    def on_sendable(self, event):
+        while self.sender.credit > 0 and self.sent < self.count:
+            msg = Message(body="Discard Test")
+            self.sender.send(msg)
+            self.sent += 1
+
+    def on_rejected(self, event):
+        self.rejected += 1
+        self.conn.close()
+        self.timer.cancel()
+
+    def on_link_error(self, event):
+        if event.receiver == self.receiver:
+            self.receiver_failed = True
+        if event.sender == self.sender:
+            self.sender_failed = True
+
+        if self.receiver_failed and self.sender_failed:
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class SourceTest(MessagingHandler):
+    def __init__(self, host, address, count, expected_ps):
+        super(SourceTest, self).__init__(prefetch = 0)
+        self.host        = host
+        self.address     = address
+        self.expected_ps = expected_ps
+
+        self.conn     = None
+        self.error    = None
+        self.receiver = None
+
+        self.count          = count
+        self.n_credit_given = 0
+        self.n_rcvd         = 0
+        self.n_rcvd_ps      = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_rcvd=%d" % (self.n_rcvd)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn     = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
+        self.receiver.flow(3)
+        self.n_credit_given = 3
+
+    def on_message(self, event):
+        dlv = event.delivery
+        if dlv.settled:
+            self.n_rcvd_ps += 1
+        self.n_rcvd += 1
+        if self.n_rcvd == self.count:
+            self.conn.close()
+            self.timer.cancel()
+            if self.n_rcvd_ps != self.expected_ps:
+                self.error = "Received %d deliveries, %d were settled (expected %d)" %\
+                             (self.n_rcvd, self.n_rcvd_ps, self.expected_ps)
+
+        elif self.n_rcvd == self.n_credit_given:
+            self.receiver.flow(5)
+            self.n_credit_given += 5
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org