You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/27 21:34:29 UTC

qpid-dispatch git commit: DISPATCH-1133 - Added the API for Core Modules. Refactored test-hooks as a core module.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e3e201652 -> fac8f7afb


DISPATCH-1133 - Added the API for Core Modules.  Refactored test-hooks as a core module.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fac8f7af
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fac8f7af
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fac8f7af

Branch: refs/heads/master
Commit: fac8f7afbc80067b71d79276bd7b81adbf864f33
Parents: e3e2016
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Sep 27 17:33:16 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Sep 27 17:33:16 2018 -0400

----------------------------------------------------------------------
 src/CMakeLists.txt                        |   4 +-
 src/router_core/core_test_hooks.c         | 408 ------------------------
 src/router_core/core_test_hooks.h         |  28 --
 src/router_core/module.h                  |  55 ++++
 src/router_core/modules/core_test_hooks.c | 421 +++++++++++++++++++++++++
 src/router_core/router_core_thread.c      |  46 ++-
 6 files changed, 522 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c3d35b9..f521fc5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,11 +39,13 @@ add_custom_command (
 if(NOT LIBWEBSOCKETS_FOUND)
 include_directories(
   ${CMAKE_CURRENT_SOURCE_DIR}
+  ${CMAKE_CURRENT_SOURCE_DIR}/router_core
   ${CMAKE_CURRENT_BINARY_DIR}
   )
 else()
 include_directories(
   ${CMAKE_CURRENT_SOURCE_DIR}
+  ${CMAKE_CURRENT_SOURCE_DIR}/router_core
   ${CMAKE_CURRENT_BINARY_DIR}
   ${LIBWEBSOCKETS_INCLUDE_DIRS}
   )
@@ -85,7 +87,6 @@ set(qpid_dispatch_SOURCES
   router_core/agent_router.c
   router_core/connections.c
   router_core/core_link_endpoint.c
-  router_core/core_test_hooks.c
   router_core/edge_control.c
   router_core/error.c
   router_core/exchange_bindings.c
@@ -98,6 +99,7 @@ set(qpid_dispatch_SOURCES
   router_core/terminus.c
   router_core/transfer.c
   router_core/core_timer.c
+  router_core/modules/core_test_hooks.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.c b/src/router_core/core_test_hooks.c
deleted file mode 100644
index 05cb75d..0000000
--- a/src/router_core/core_test_hooks.c
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/dispatch/ctools.h"
-#include "qpid/dispatch/message.h"
-#include "qpid/dispatch/compose.h"
-#include "core_test_hooks.h"
-#include "core_link_endpoint.h"
-#include <stdio.h>
-#include <inttypes.h>
-
-typedef enum {
-    TEST_NODE_ECHO,
-    TEST_NODE_DENY,
-    TEST_NODE_SINK,
-    TEST_NODE_SOURCE,
-    TEST_NODE_SOURCE_PS,
-    TEST_NODE_DISCARD
-} test_node_behavior_t;
-
-typedef struct test_node_t test_node_t;
-
-typedef struct test_endpoint_t {
-    DEQ_LINKS(struct test_endpoint_t);
-    test_node_t         *node;
-    qdrc_endpoint_t     *ep;
-    qdr_delivery_list_t  deliveries;
-    int                  credit;
-    bool                 in_action_list;
-    bool                 detached;
-} test_endpoint_t;
-
-DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
-
-struct test_node_t {
-    qdr_core_t           *core;
-    test_node_behavior_t  behavior;
-    qdrc_endpoint_desc_t *desc;
-    test_endpoint_list_t  in_links;
-    test_endpoint_list_t  out_links;
-};
-
-static test_node_t *echo_node;
-static test_node_t *deny_node;
-static test_node_t *sink_node;
-static test_node_t *source_node;
-static test_node_t *source_ps_node;
-static test_node_t *discard_node;
-
-
-static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
-
-
-static void source_send(test_endpoint_t *ep, bool presettled)
-{
-    static uint32_t      sequence = 0;
-    static char          stringbuf[100];
-    qdr_delivery_t      *dlv;
-    qd_message_t        *msg   = qd_message();
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
-
-    sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
-
-    qd_compose_start_list(field);
-    qd_compose_insert_bool(field, 0);     // durable
-    qd_compose_end_list(field);
-
-    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
-    qd_compose_start_list(field);
-    qd_compose_insert_null(field);        // message-id
-    qd_compose_end_list(field);
-
-    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
-    qd_compose_start_map(field);
-    qd_compose_insert_symbol(field, "sequence");
-    qd_compose_insert_uint(field, sequence++);
-    qd_compose_end_map(field);
-
-    field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
-    qd_compose_insert_string(field, stringbuf);
-
-    dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
-    qd_message_compose_2(msg, field);
-    qd_compose_free(field);
-    qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
-
-    if (--ep->credit > 0) {
-        qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
-        action->args.general.context_1 = (void*) ep;
-        ep->in_action_list = true;
-        qdr_action_enqueue(ep->node->core, action);
-    }
-}
-
-
-static void free_endpoint(test_endpoint_t *ep)
-{
-    test_node_t *node = ep->node;
-
-    if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
-        DEQ_REMOVE(node->in_links, ep);
-    else
-        DEQ_REMOVE(node->out_links, ep);
-    free(ep);
-}
-
-
-static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    if (discard)
-        return;
-
-    test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
-
-    ep->in_action_list = false;
-    if (ep->detached) {
-        free_endpoint(ep);
-        return;
-    }
-
-    switch (ep->node->behavior) {
-    case TEST_NODE_DENY :
-    case TEST_NODE_SINK :
-    case TEST_NODE_DISCARD :
-
-    case TEST_NODE_SOURCE :
-        source_send(ep, false);
-        break;
-
-    case TEST_NODE_SOURCE_PS :
-        source_send(ep, true);
-        break;
-
-    case TEST_NODE_ECHO :
-        break;
-    }
-}
-
-
-static bool first_attach(void             *bind_context,
-                         qdrc_endpoint_t  *endpoint,
-                         void            **link_context,
-                         qdr_error_t     **error)
-{
-    test_node_t     *node     = (test_node_t*) bind_context;
-    test_endpoint_t *test_ep  = 0;
-    bool             incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
-
-    switch (node->behavior) {
-    case TEST_NODE_DENY :
-        *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
-        return false;
-
-    case TEST_NODE_ECHO :
-        break;
-
-    case TEST_NODE_SINK :
-        if (incoming) {
-            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
-        } else {
-            *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
-            return false;
-        }
-        break;
-
-    case TEST_NODE_SOURCE :
-    case TEST_NODE_SOURCE_PS :
-        if (incoming) {
-            *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
-            return false;
-        }
-        break;
-
-    case TEST_NODE_DISCARD :
-        if (incoming) {
-            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
-        } else {
-            *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
-            return false;
-        }
-        break;
-    }
-
-    test_ep = NEW(test_endpoint_t);
-    ZERO(test_ep);
-    test_ep->node = node;
-    test_ep->ep   = endpoint;
-    *link_context = test_ep;
-
-    if (incoming)
-        DEQ_INSERT_TAIL(node->in_links, test_ep);
-    else
-        DEQ_INSERT_TAIL(node->out_links, test_ep);
-
-    return true;
-}
-
-
-static void second_attach(void           *link_context,
-                          qdr_terminus_t *remote_source,
-                          qdr_terminus_t *remote_target)
-{
-}
-
-
-static void flow(void *link_context,
-                 int   available_credit,
-                 bool  drain)
-{
-    test_endpoint_t *ep = (test_endpoint_t*) link_context;
-    if (available_credit == 0)
-        return;
-
-    ep->credit = available_credit;
-
-    switch (ep->node->behavior) {
-    case TEST_NODE_DENY :
-    case TEST_NODE_SINK :
-    case TEST_NODE_DISCARD :
-        break;
-
-    case TEST_NODE_SOURCE :
-        source_send(ep, false);
-        break;
-
-    case TEST_NODE_SOURCE_PS :
-        source_send(ep, true);
-        break;
-
-    case TEST_NODE_ECHO :
-        break;
-    }
-}
-
-
-static void update(void           *link_context,
-                   qdr_delivery_t *delivery,
-                   bool            settled,
-                   uint64_t        disposition)
-{
-}
-
-
-static void transfer(void           *link_context,
-                     qdr_delivery_t *delivery,
-                     qd_message_t   *message)
-{
-    test_endpoint_t *ep = (test_endpoint_t*) link_context;
-
-    if (!qd_message_receive_complete(message))
-        return;
-
-    switch (ep->node->behavior) {
-    case TEST_NODE_DENY :
-    case TEST_NODE_SOURCE :
-    case TEST_NODE_SOURCE_PS :
-        assert(false); // Can't get here.  Link should not have been opened
-        break;
-
-    case TEST_NODE_ECHO :
-        break;
-
-    case TEST_NODE_SINK :
-        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
-        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
-        break;
-
-    case TEST_NODE_DISCARD :
-        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
-        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
-        break;
-    }
-}
-
-
-static void detach(void        *link_context,
-                   qdr_error_t *error)
-{
-    test_endpoint_t *ep = (test_endpoint_t*) link_context;
-
-    if (ep->in_action_list) {
-        ep->detached = true;
-    } else {
-        free_endpoint(ep);
-    }
-}
-
-
-static void cleanup(void *link_context)
-{
-}
-
-
-static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
-
-
-static void qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
-{
-    char *echo_address       = "org.apache.qpid.dispatch.router/test/echo";
-    char *deny_address       = "org.apache.qpid.dispatch.router/test/deny";
-    char *sink_address       = "org.apache.qpid.dispatch.router/test/sink";
-    char *source_address     = "org.apache.qpid.dispatch.router/test/source";
-    char *source_ps_address  = "org.apache.qpid.dispatch.router/test/source_ps";
-    char *discard_address    = "org.apache.qpid.dispatch.router/test/discard";
-
-    echo_node       = NEW(test_node_t);
-    deny_node       = NEW(test_node_t);
-    sink_node       = NEW(test_node_t);
-    source_node     = NEW(test_node_t);
-    source_ps_node  = NEW(test_node_t);
-    discard_node    = NEW(test_node_t);
-
-    echo_node->core     = core;
-    echo_node->behavior = TEST_NODE_ECHO;
-    echo_node->desc     = &descriptor;
-    DEQ_INIT(echo_node->in_links);
-    DEQ_INIT(echo_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, echo_node);
-
-    deny_node->core     = core;
-    deny_node->behavior = TEST_NODE_DENY;
-    deny_node->desc     = &descriptor;
-    DEQ_INIT(deny_node->in_links);
-    DEQ_INIT(deny_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, deny_node);
-
-    sink_node->core     = core;
-    sink_node->behavior = TEST_NODE_SINK;
-    sink_node->desc     = &descriptor;
-    DEQ_INIT(sink_node->in_links);
-    DEQ_INIT(sink_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, sink_node);
-
-    source_node->core     = core;
-    source_node->behavior = TEST_NODE_SOURCE;
-    source_node->desc     = &descriptor;
-    DEQ_INIT(source_node->in_links);
-    DEQ_INIT(source_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, source_node);
-
-    source_ps_node->core     = core;
-    source_ps_node->behavior = TEST_NODE_SOURCE_PS;
-    source_ps_node->desc     = &descriptor;
-    DEQ_INIT(source_ps_node->in_links);
-    DEQ_INIT(source_ps_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, source_ps_node);
-
-    discard_node->core     = core;
-    discard_node->behavior = TEST_NODE_DISCARD;
-    discard_node->desc     = &descriptor;
-    DEQ_INIT(discard_node->in_links);
-    DEQ_INIT(discard_node->out_links);
-    qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, discard_node);
-}
-
-
-static void qdrc_test_hooks_core_endpoint_finalize(qdr_core_t *core)
-{
-    free(echo_node);
-    free(deny_node);
-    free(sink_node);
-    free(source_node);
-    free(source_ps_node);
-    free(discard_node);
-}
-
-
-void qdrc_test_hooks_init_CT(qdr_core_t *core)
-{
-    //
-    // Exit if the test hooks are not enabled (by the --test-hooks command line option)
-    //
-    if (!core->qd->test_hooks)
-        return;
-
-    qd_log(core->log, QD_LOG_INFO, "Core thread system test hooks enabled");
-
-    qdrc_test_hooks_core_endpoint_setup(core);
-}
-
-
-void qdrc_test_hooks_final_CT(qdr_core_t *core)
-{
-    //
-    // Exit if the test hooks are not enabled (by the --test-hooks command line option)
-    //
-    if (!core->qd->test_hooks)
-        return;
-
-    qdrc_test_hooks_core_endpoint_finalize(core);
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/core_test_hooks.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.h b/src/router_core/core_test_hooks.h
deleted file mode 100644
index 7be20f3..0000000
--- a/src/router_core/core_test_hooks.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef qd_router_core_test_hooks
-#define qd_router_core_test_hooks 1
-
-#include "router_core_private.h"
-
-void qdrc_test_hooks_init_CT(qdr_core_t *core);
-void qdrc_test_hooks_final_CT(qdr_core_t *core);
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/module.h
----------------------------------------------------------------------
diff --git a/src/router_core/module.h b/src/router_core/module.h
new file mode 100644
index 0000000..05b9a6f
--- /dev/null
+++ b/src/router_core/module.h
@@ -0,0 +1,55 @@
+#ifndef qd_router_core_module
+#define qd_router_core_module 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * Callback to initialize a core module at core thread startup
+ *
+ * @param core Pointer to the core object
+ * @param module_context [out] Returned module context
+ */
+typedef void (*qdrc_module_init_t) (qdr_core_t *core, void **module_context);
+
+
+/**
+ * Callback to finailize a core module at core thread shutdown
+ *
+ * @param module_context The context returned by the module during the on_init call
+ */
+typedef void (*qdrc_module_final_t) (void *module_context);
+
+
+/**
+ * Declaration of a core module
+ *
+ * A module must declare itself by invoking the QDR_CORE_MODULE_DECLARE macro in its body.
+ *
+ * @param name A null-terminated literal string naming the module
+ * @param on_init Pointer to a function for module initialization, called at core thread startup
+ * @param on_final Pointer to a function for module finalization, called at core thread shutdown
+ */
+#define QDR_CORE_MODULE_DECLARE(name,on_init,on_final)   \
+    static void modstart() __attribute__((constructor)); \
+    void modstart() { qdr_register_core_module(name, on_init, on_final); }
+void qdr_register_core_module(const char *name, qdrc_module_init_t on_init, qdrc_module_final_t on_final);
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/modules/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/core_test_hooks.c b/src/router_core/modules/core_test_hooks.c
new file mode 100644
index 0000000..eaa4407
--- /dev/null
+++ b/src/router_core/modules/core_test_hooks.c
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/message.h"
+#include "qpid/dispatch/compose.h"
+#include "core_link_endpoint.h"
+#include "module.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+typedef enum {
+    TEST_NODE_ECHO,
+    TEST_NODE_DENY,
+    TEST_NODE_SINK,
+    TEST_NODE_SOURCE,
+    TEST_NODE_SOURCE_PS,
+    TEST_NODE_DISCARD
+} test_node_behavior_t;
+
+typedef struct test_module_t test_module_t;
+typedef struct test_node_t   test_node_t;
+
+typedef struct test_endpoint_t {
+    DEQ_LINKS(struct test_endpoint_t);
+    test_node_t         *node;
+    qdrc_endpoint_t     *ep;
+    qdr_delivery_list_t  deliveries;
+    int                  credit;
+    bool                 in_action_list;
+    bool                 detached;
+} test_endpoint_t;
+
+DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
+
+struct test_node_t {
+    qdr_core_t           *core;
+    test_module_t        *module;
+    test_node_behavior_t  behavior;
+    qdrc_endpoint_desc_t *desc;
+    test_endpoint_list_t  in_links;
+    test_endpoint_list_t  out_links;
+};
+
+struct test_module_t {
+    qdr_core_t  *core;
+    test_node_t *echo_node;
+    test_node_t *deny_node;
+    test_node_t *sink_node;
+    test_node_t *source_node;
+    test_node_t *source_ps_node;
+    test_node_t *discard_node;
+};
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+static void source_send(test_endpoint_t *ep, bool presettled)
+{
+    static uint32_t      sequence = 0;
+    static char          stringbuf[100];
+    qdr_delivery_t      *dlv;
+    qd_message_t        *msg   = qd_message();
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+
+    sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
+
+    qd_compose_start_list(field);
+    qd_compose_insert_bool(field, 0);     // durable
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+    qd_compose_start_list(field);
+    qd_compose_insert_null(field);        // message-id
+    qd_compose_end_list(field);
+
+    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+    qd_compose_start_map(field);
+    qd_compose_insert_symbol(field, "sequence");
+    qd_compose_insert_uint(field, sequence++);
+    qd_compose_end_map(field);
+
+    field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    qd_compose_insert_string(field, stringbuf);
+
+    dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
+    qd_message_compose_2(msg, field);
+    qd_compose_free(field);
+    qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
+
+    if (--ep->credit > 0) {
+        qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
+        action->args.general.context_1 = (void*) ep;
+        ep->in_action_list = true;
+        qdr_action_enqueue(ep->node->core, action);
+    }
+}
+
+
+static void free_endpoint(test_endpoint_t *ep)
+{
+    test_node_t *node = ep->node;
+
+    if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+        DEQ_REMOVE(node->in_links, ep);
+    else
+        DEQ_REMOVE(node->out_links, ep);
+    free(ep);
+}
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
+
+    ep->in_action_list = false;
+    if (ep->detached) {
+        free_endpoint(ep);
+        return;
+    }
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SINK :
+    case TEST_NODE_DISCARD :
+
+    case TEST_NODE_SOURCE :
+        source_send(ep, false);
+        break;
+
+    case TEST_NODE_SOURCE_PS :
+        source_send(ep, true);
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+    }
+}
+
+
+static bool first_attach(void             *bind_context,
+                         qdrc_endpoint_t  *endpoint,
+                         void            **link_context,
+                         qdr_error_t     **error)
+{
+    test_node_t     *node     = (test_node_t*) bind_context;
+    test_endpoint_t *test_ep  = 0;
+    bool             incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+
+    switch (node->behavior) {
+    case TEST_NODE_DENY :
+        *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+        return false;
+
+    case TEST_NODE_ECHO :
+        break;
+
+    case TEST_NODE_SINK :
+        if (incoming) {
+            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+        } else {
+            *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
+            return false;
+        }
+        break;
+
+    case TEST_NODE_SOURCE :
+    case TEST_NODE_SOURCE_PS :
+        if (incoming) {
+            *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
+            return false;
+        }
+        break;
+
+    case TEST_NODE_DISCARD :
+        if (incoming) {
+            qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+        } else {
+            *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
+            return false;
+        }
+        break;
+    }
+
+    test_ep = NEW(test_endpoint_t);
+    ZERO(test_ep);
+    test_ep->node = node;
+    test_ep->ep   = endpoint;
+    *link_context = test_ep;
+
+    if (incoming)
+        DEQ_INSERT_TAIL(node->in_links, test_ep);
+    else
+        DEQ_INSERT_TAIL(node->out_links, test_ep);
+
+    return true;
+}
+
+
+static void second_attach(void           *link_context,
+                          qdr_terminus_t *remote_source,
+                          qdr_terminus_t *remote_target)
+{
+}
+
+
+static void flow(void *link_context,
+                 int   available_credit,
+                 bool  drain)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+    if (available_credit == 0)
+        return;
+
+    ep->credit = available_credit;
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SINK :
+    case TEST_NODE_DISCARD :
+        break;
+
+    case TEST_NODE_SOURCE :
+        source_send(ep, false);
+        break;
+
+    case TEST_NODE_SOURCE_PS :
+        source_send(ep, true);
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+    }
+}
+
+
+static void update(void           *link_context,
+                   qdr_delivery_t *delivery,
+                   bool            settled,
+                   uint64_t        disposition)
+{
+}
+
+
+static void transfer(void           *link_context,
+                     qdr_delivery_t *delivery,
+                     qd_message_t   *message)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (!qd_message_receive_complete(message))
+        return;
+
+    switch (ep->node->behavior) {
+    case TEST_NODE_DENY :
+    case TEST_NODE_SOURCE :
+    case TEST_NODE_SOURCE_PS :
+        assert(false); // Can't get here.  Link should not have been opened
+        break;
+
+    case TEST_NODE_ECHO :
+        break;
+
+    case TEST_NODE_SINK :
+        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
+        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+        break;
+
+    case TEST_NODE_DISCARD :
+        qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
+        qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+        break;
+    }
+}
+
+
+static void detach(void        *link_context,
+                   qdr_error_t *error)
+{
+    test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+    if (ep->in_action_list) {
+        ep->detached = true;
+    } else {
+        free_endpoint(ep);
+    }
+}
+
+
+static void cleanup(void *link_context)
+{
+}
+
+
+static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+
+
+static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
+{
+    char *echo_address       = "org.apache.qpid.dispatch.router/test/echo";
+    char *deny_address       = "org.apache.qpid.dispatch.router/test/deny";
+    char *sink_address       = "org.apache.qpid.dispatch.router/test/sink";
+    char *source_address     = "org.apache.qpid.dispatch.router/test/source";
+    char *source_ps_address  = "org.apache.qpid.dispatch.router/test/source_ps";
+    char *discard_address    = "org.apache.qpid.dispatch.router/test/discard";
+
+    test_module_t *module = NEW(test_module_t);
+
+    module->core           = core;
+    module->echo_node      = NEW(test_node_t);
+    module->deny_node      = NEW(test_node_t);
+    module->sink_node      = NEW(test_node_t);
+    module->source_node    = NEW(test_node_t);
+    module->source_ps_node = NEW(test_node_t);
+    module->discard_node   = NEW(test_node_t);
+
+    module->echo_node->core     = core;
+    module->echo_node->module   = module;
+    module->echo_node->behavior = TEST_NODE_ECHO;
+    module->echo_node->desc     = &descriptor;
+    DEQ_INIT(module->echo_node->in_links);
+    DEQ_INIT(module->echo_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, module->echo_node);
+
+    module->deny_node->core     = core;
+    module->deny_node->module   = module;
+    module->deny_node->behavior = TEST_NODE_DENY;
+    module->deny_node->desc     = &descriptor;
+    DEQ_INIT(module->deny_node->in_links);
+    DEQ_INIT(module->deny_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, module->deny_node);
+
+    module->sink_node->core     = core;
+    module->sink_node->module   = module;
+    module->sink_node->behavior = TEST_NODE_SINK;
+    module->sink_node->desc     = &descriptor;
+    DEQ_INIT(module->sink_node->in_links);
+    DEQ_INIT(module->sink_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, module->sink_node);
+
+    module->source_node->core     = core;
+    module->source_node->module   = module;
+    module->source_node->behavior = TEST_NODE_SOURCE;
+    module->source_node->desc     = &descriptor;
+    DEQ_INIT(module->source_node->in_links);
+    DEQ_INIT(module->source_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, module->source_node);
+
+    module->source_ps_node->core     = core;
+    module->source_ps_node->module   = module;
+    module->source_ps_node->behavior = TEST_NODE_SOURCE_PS;
+    module->source_ps_node->desc     = &descriptor;
+    DEQ_INIT(module->source_ps_node->in_links);
+    DEQ_INIT(module->source_ps_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, module->source_ps_node);
+
+    module->discard_node->core     = core;
+    module->discard_node->module   = module;
+    module->discard_node->behavior = TEST_NODE_DISCARD;
+    module->discard_node->desc     = &descriptor;
+    DEQ_INIT(module->discard_node->in_links);
+    DEQ_INIT(module->discard_node->out_links);
+    qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, module->discard_node);
+
+    return module;
+}
+
+
+static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module)
+{
+    free(module->echo_node);
+    free(module->deny_node);
+    free(module->sink_node);
+    free(module->source_node);
+    free(module->source_ps_node);
+    free(module->discard_node);
+}
+
+
+static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context)
+{
+    //
+    // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+    //
+    if (!core->qd->test_hooks) {
+        *module_context = 0;
+        return;
+    }
+
+    *module_context = qdrc_test_hooks_core_endpoint_setup(core);
+}
+
+
+static void qdrc_test_hooks_final_CT(void *module_context)
+{
+    if (!!module_context)
+        qdrc_test_hooks_core_endpoint_finalize(module_context);
+}
+
+
+QDR_CORE_MODULE_DECLARE("core_test_hooks", qdrc_test_hooks_init_CT, qdrc_test_hooks_final_CT)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index ae6bbdc..11525d1 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -18,7 +18,7 @@
  */
 
 #include "router_core_private.h"
-#include "core_test_hooks.h"
+#include "module.h"
 
 /**
  * Creates a thread that is dedicated to managing and using the routing table.
@@ -31,6 +31,28 @@
 ALLOC_DEFINE(qdr_action_t);
 
 
+typedef struct qdrc_module_t {
+    DEQ_LINKS(struct qdrc_module_t);
+    const char          *name;
+    qdrc_module_init_t   on_init;
+    qdrc_module_final_t  on_final;
+    void                *context;
+} qdrc_module_t;
+
+DEQ_DECLARE(qdrc_module_t, qdrc_module_list_t);
+static qdrc_module_list_t registered_modules = {0,0};
+
+void qdr_register_core_module(const char *name, qdrc_module_init_t on_init, qdrc_module_final_t on_final)
+{
+    qdrc_module_t *module = NEW(qdrc_module_t);
+    ZERO(module);
+    module->name     = name;
+    module->on_init  = on_init;
+    module->on_final = on_final;
+    DEQ_INSERT_TAIL(registered_modules, module);
+}
+
+
 static void qdr_activate_connections_CT(qdr_core_t *core)
 {
     qdr_connection_t *conn = DEQ_HEAD(core->connections_to_activate);
@@ -52,7 +74,16 @@ void *router_core_thread(void *arg)
     qdr_forwarder_setup_CT(core);
     qdr_route_table_setup_CT(core);
     qdr_agent_setup_CT(core);
-    qdrc_test_hooks_init_CT(core);
+
+    //
+    // Initialize registered modules
+    //
+    qdrc_module_t *module = DEQ_HEAD(registered_modules);
+    while (module) {
+        qd_log(core->log, QD_LOG_INFO, "Initializing core module: %s", module->name);
+        module->on_init(core, &module->context);
+        module = DEQ_NEXT(module);
+    }
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
     while (core->running) {
@@ -93,7 +124,16 @@ void *router_core_thread(void *arg)
         qdr_activate_connections_CT(core);
     }
 
-    qdrc_test_hooks_final_CT(core);
+    //
+    // Finalize registered modules
+    //
+    module = DEQ_TAIL(registered_modules);
+    while (module) {
+        qd_log(core->log, QD_LOG_INFO, "Finalizing core module: %s", module->name);
+        module->on_final(module->context);
+        module = DEQ_PREV(module);
+    }
+
     qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
     return 0;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org