You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/27 21:34:29 UTC
qpid-dispatch git commit: DISPATCH-1133 - Added the API for Core
Modules. Refactored test-hooks as a core module.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master e3e201652 -> fac8f7afb
DISPATCH-1133 - Added the API for Core Modules. Refactored test-hooks as a core module.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fac8f7af
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fac8f7af
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fac8f7af
Branch: refs/heads/master
Commit: fac8f7afbc80067b71d79276bd7b81adbf864f33
Parents: e3e2016
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Sep 27 17:33:16 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Sep 27 17:33:16 2018 -0400
----------------------------------------------------------------------
src/CMakeLists.txt | 4 +-
src/router_core/core_test_hooks.c | 408 ------------------------
src/router_core/core_test_hooks.h | 28 --
src/router_core/module.h | 55 ++++
src/router_core/modules/core_test_hooks.c | 421 +++++++++++++++++++++++++
src/router_core/router_core_thread.c | 46 ++-
6 files changed, 522 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c3d35b9..f521fc5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,11 +39,13 @@ add_custom_command (
if(NOT LIBWEBSOCKETS_FOUND)
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/router_core
${CMAKE_CURRENT_BINARY_DIR}
)
else()
include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/router_core
${CMAKE_CURRENT_BINARY_DIR}
${LIBWEBSOCKETS_INCLUDE_DIRS}
)
@@ -85,7 +87,6 @@ set(qpid_dispatch_SOURCES
router_core/agent_router.c
router_core/connections.c
router_core/core_link_endpoint.c
- router_core/core_test_hooks.c
router_core/edge_control.c
router_core/error.c
router_core/exchange_bindings.c
@@ -98,6 +99,7 @@ set(qpid_dispatch_SOURCES
router_core/terminus.c
router_core/transfer.c
router_core/core_timer.c
+ router_core/modules/core_test_hooks.c
router_node.c
router_pynode.c
schema_enum.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.c b/src/router_core/core_test_hooks.c
deleted file mode 100644
index 05cb75d..0000000
--- a/src/router_core/core_test_hooks.c
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/dispatch/ctools.h"
-#include "qpid/dispatch/message.h"
-#include "qpid/dispatch/compose.h"
-#include "core_test_hooks.h"
-#include "core_link_endpoint.h"
-#include <stdio.h>
-#include <inttypes.h>
-
-typedef enum {
- TEST_NODE_ECHO,
- TEST_NODE_DENY,
- TEST_NODE_SINK,
- TEST_NODE_SOURCE,
- TEST_NODE_SOURCE_PS,
- TEST_NODE_DISCARD
-} test_node_behavior_t;
-
-typedef struct test_node_t test_node_t;
-
-typedef struct test_endpoint_t {
- DEQ_LINKS(struct test_endpoint_t);
- test_node_t *node;
- qdrc_endpoint_t *ep;
- qdr_delivery_list_t deliveries;
- int credit;
- bool in_action_list;
- bool detached;
-} test_endpoint_t;
-
-DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
-
-struct test_node_t {
- qdr_core_t *core;
- test_node_behavior_t behavior;
- qdrc_endpoint_desc_t *desc;
- test_endpoint_list_t in_links;
- test_endpoint_list_t out_links;
-};
-
-static test_node_t *echo_node;
-static test_node_t *deny_node;
-static test_node_t *sink_node;
-static test_node_t *source_node;
-static test_node_t *source_ps_node;
-static test_node_t *discard_node;
-
-
-static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
-
-
-static void source_send(test_endpoint_t *ep, bool presettled)
-{
- static uint32_t sequence = 0;
- static char stringbuf[100];
- qdr_delivery_t *dlv;
- qd_message_t *msg = qd_message();
- qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
-
- sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
-
- qd_compose_start_list(field);
- qd_compose_insert_bool(field, 0); // durable
- qd_compose_end_list(field);
-
- field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
- qd_compose_start_list(field);
- qd_compose_insert_null(field); // message-id
- qd_compose_end_list(field);
-
- field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
- qd_compose_start_map(field);
- qd_compose_insert_symbol(field, "sequence");
- qd_compose_insert_uint(field, sequence++);
- qd_compose_end_map(field);
-
- field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
- qd_compose_insert_string(field, stringbuf);
-
- dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
- qd_message_compose_2(msg, field);
- qd_compose_free(field);
- qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
-
- if (--ep->credit > 0) {
- qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
- action->args.general.context_1 = (void*) ep;
- ep->in_action_list = true;
- qdr_action_enqueue(ep->node->core, action);
- }
-}
-
-
-static void free_endpoint(test_endpoint_t *ep)
-{
- test_node_t *node = ep->node;
-
- if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
- DEQ_REMOVE(node->in_links, ep);
- else
- DEQ_REMOVE(node->out_links, ep);
- free(ep);
-}
-
-
-static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
- if (discard)
- return;
-
- test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
-
- ep->in_action_list = false;
- if (ep->detached) {
- free_endpoint(ep);
- return;
- }
-
- switch (ep->node->behavior) {
- case TEST_NODE_DENY :
- case TEST_NODE_SINK :
- case TEST_NODE_DISCARD :
-
- case TEST_NODE_SOURCE :
- source_send(ep, false);
- break;
-
- case TEST_NODE_SOURCE_PS :
- source_send(ep, true);
- break;
-
- case TEST_NODE_ECHO :
- break;
- }
-}
-
-
-static bool first_attach(void *bind_context,
- qdrc_endpoint_t *endpoint,
- void **link_context,
- qdr_error_t **error)
-{
- test_node_t *node = (test_node_t*) bind_context;
- test_endpoint_t *test_ep = 0;
- bool incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
-
- switch (node->behavior) {
- case TEST_NODE_DENY :
- *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
- return false;
-
- case TEST_NODE_ECHO :
- break;
-
- case TEST_NODE_SINK :
- if (incoming) {
- qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
- } else {
- *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
- return false;
- }
- break;
-
- case TEST_NODE_SOURCE :
- case TEST_NODE_SOURCE_PS :
- if (incoming) {
- *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
- return false;
- }
- break;
-
- case TEST_NODE_DISCARD :
- if (incoming) {
- qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
- } else {
- *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
- return false;
- }
- break;
- }
-
- test_ep = NEW(test_endpoint_t);
- ZERO(test_ep);
- test_ep->node = node;
- test_ep->ep = endpoint;
- *link_context = test_ep;
-
- if (incoming)
- DEQ_INSERT_TAIL(node->in_links, test_ep);
- else
- DEQ_INSERT_TAIL(node->out_links, test_ep);
-
- return true;
-}
-
-
-static void second_attach(void *link_context,
- qdr_terminus_t *remote_source,
- qdr_terminus_t *remote_target)
-{
-}
-
-
-static void flow(void *link_context,
- int available_credit,
- bool drain)
-{
- test_endpoint_t *ep = (test_endpoint_t*) link_context;
- if (available_credit == 0)
- return;
-
- ep->credit = available_credit;
-
- switch (ep->node->behavior) {
- case TEST_NODE_DENY :
- case TEST_NODE_SINK :
- case TEST_NODE_DISCARD :
- break;
-
- case TEST_NODE_SOURCE :
- source_send(ep, false);
- break;
-
- case TEST_NODE_SOURCE_PS :
- source_send(ep, true);
- break;
-
- case TEST_NODE_ECHO :
- break;
- }
-}
-
-
-static void update(void *link_context,
- qdr_delivery_t *delivery,
- bool settled,
- uint64_t disposition)
-{
-}
-
-
-static void transfer(void *link_context,
- qdr_delivery_t *delivery,
- qd_message_t *message)
-{
- test_endpoint_t *ep = (test_endpoint_t*) link_context;
-
- if (!qd_message_receive_complete(message))
- return;
-
- switch (ep->node->behavior) {
- case TEST_NODE_DENY :
- case TEST_NODE_SOURCE :
- case TEST_NODE_SOURCE_PS :
- assert(false); // Can't get here. Link should not have been opened
- break;
-
- case TEST_NODE_ECHO :
- break;
-
- case TEST_NODE_SINK :
- qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
- qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
- break;
-
- case TEST_NODE_DISCARD :
- qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
- qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
- break;
- }
-}
-
-
-static void detach(void *link_context,
- qdr_error_t *error)
-{
- test_endpoint_t *ep = (test_endpoint_t*) link_context;
-
- if (ep->in_action_list) {
- ep->detached = true;
- } else {
- free_endpoint(ep);
- }
-}
-
-
-static void cleanup(void *link_context)
-{
-}
-
-
-static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
-
-
-static void qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
-{
- char *echo_address = "org.apache.qpid.dispatch.router/test/echo";
- char *deny_address = "org.apache.qpid.dispatch.router/test/deny";
- char *sink_address = "org.apache.qpid.dispatch.router/test/sink";
- char *source_address = "org.apache.qpid.dispatch.router/test/source";
- char *source_ps_address = "org.apache.qpid.dispatch.router/test/source_ps";
- char *discard_address = "org.apache.qpid.dispatch.router/test/discard";
-
- echo_node = NEW(test_node_t);
- deny_node = NEW(test_node_t);
- sink_node = NEW(test_node_t);
- source_node = NEW(test_node_t);
- source_ps_node = NEW(test_node_t);
- discard_node = NEW(test_node_t);
-
- echo_node->core = core;
- echo_node->behavior = TEST_NODE_ECHO;
- echo_node->desc = &descriptor;
- DEQ_INIT(echo_node->in_links);
- DEQ_INIT(echo_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, echo_node);
-
- deny_node->core = core;
- deny_node->behavior = TEST_NODE_DENY;
- deny_node->desc = &descriptor;
- DEQ_INIT(deny_node->in_links);
- DEQ_INIT(deny_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, deny_node);
-
- sink_node->core = core;
- sink_node->behavior = TEST_NODE_SINK;
- sink_node->desc = &descriptor;
- DEQ_INIT(sink_node->in_links);
- DEQ_INIT(sink_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, sink_node);
-
- source_node->core = core;
- source_node->behavior = TEST_NODE_SOURCE;
- source_node->desc = &descriptor;
- DEQ_INIT(source_node->in_links);
- DEQ_INIT(source_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, source_node);
-
- source_ps_node->core = core;
- source_ps_node->behavior = TEST_NODE_SOURCE_PS;
- source_ps_node->desc = &descriptor;
- DEQ_INIT(source_ps_node->in_links);
- DEQ_INIT(source_ps_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, source_ps_node);
-
- discard_node->core = core;
- discard_node->behavior = TEST_NODE_DISCARD;
- discard_node->desc = &descriptor;
- DEQ_INIT(discard_node->in_links);
- DEQ_INIT(discard_node->out_links);
- qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, discard_node);
-}
-
-
-static void qdrc_test_hooks_core_endpoint_finalize(qdr_core_t *core)
-{
- free(echo_node);
- free(deny_node);
- free(sink_node);
- free(source_node);
- free(source_ps_node);
- free(discard_node);
-}
-
-
-void qdrc_test_hooks_init_CT(qdr_core_t *core)
-{
- //
- // Exit if the test hooks are not enabled (by the --test-hooks command line option)
- //
- if (!core->qd->test_hooks)
- return;
-
- qd_log(core->log, QD_LOG_INFO, "Core thread system test hooks enabled");
-
- qdrc_test_hooks_core_endpoint_setup(core);
-}
-
-
-void qdrc_test_hooks_final_CT(qdr_core_t *core)
-{
- //
- // Exit if the test hooks are not enabled (by the --test-hooks command line option)
- //
- if (!core->qd->test_hooks)
- return;
-
- qdrc_test_hooks_core_endpoint_finalize(core);
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/core_test_hooks.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.h b/src/router_core/core_test_hooks.h
deleted file mode 100644
index 7be20f3..0000000
--- a/src/router_core/core_test_hooks.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef qd_router_core_test_hooks
-#define qd_router_core_test_hooks 1
-
-#include "router_core_private.h"
-
-void qdrc_test_hooks_init_CT(qdr_core_t *core);
-void qdrc_test_hooks_final_CT(qdr_core_t *core);
-
-#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/module.h
----------------------------------------------------------------------
diff --git a/src/router_core/module.h b/src/router_core/module.h
new file mode 100644
index 0000000..05b9a6f
--- /dev/null
+++ b/src/router_core/module.h
@@ -0,0 +1,55 @@
+#ifndef qd_router_core_module
+#define qd_router_core_module 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * Callback to initialize a core module at core thread startup
+ *
+ * @param core Pointer to the core object
+ * @param module_context [out] Returned module context
+ */
+typedef void (*qdrc_module_init_t) (qdr_core_t *core, void **module_context);
+
+
+/**
+ * Callback to finailize a core module at core thread shutdown
+ *
+ * @param module_context The context returned by the module during the on_init call
+ */
+typedef void (*qdrc_module_final_t) (void *module_context);
+
+
+/**
+ * Declaration of a core module
+ *
+ * A module must declare itself by invoking the QDR_CORE_MODULE_DECLARE macro in its body.
+ *
+ * @param name A null-terminated literal string naming the module
+ * @param on_init Pointer to a function for module initialization, called at core thread startup
+ * @param on_final Pointer to a function for module finalization, called at core thread shutdown
+ */
+#define QDR_CORE_MODULE_DECLARE(name,on_init,on_final) \
+ static void modstart() __attribute__((constructor)); \
+ void modstart() { qdr_register_core_module(name, on_init, on_final); }
+void qdr_register_core_module(const char *name, qdrc_module_init_t on_init, qdrc_module_final_t on_final);
+
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/modules/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/core_test_hooks.c b/src/router_core/modules/core_test_hooks.c
new file mode 100644
index 0000000..eaa4407
--- /dev/null
+++ b/src/router_core/modules/core_test_hooks.c
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/message.h"
+#include "qpid/dispatch/compose.h"
+#include "core_link_endpoint.h"
+#include "module.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+typedef enum {
+ TEST_NODE_ECHO,
+ TEST_NODE_DENY,
+ TEST_NODE_SINK,
+ TEST_NODE_SOURCE,
+ TEST_NODE_SOURCE_PS,
+ TEST_NODE_DISCARD
+} test_node_behavior_t;
+
+typedef struct test_module_t test_module_t;
+typedef struct test_node_t test_node_t;
+
+typedef struct test_endpoint_t {
+ DEQ_LINKS(struct test_endpoint_t);
+ test_node_t *node;
+ qdrc_endpoint_t *ep;
+ qdr_delivery_list_t deliveries;
+ int credit;
+ bool in_action_list;
+ bool detached;
+} test_endpoint_t;
+
+DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
+
+struct test_node_t {
+ qdr_core_t *core;
+ test_module_t *module;
+ test_node_behavior_t behavior;
+ qdrc_endpoint_desc_t *desc;
+ test_endpoint_list_t in_links;
+ test_endpoint_list_t out_links;
+};
+
+struct test_module_t {
+ qdr_core_t *core;
+ test_node_t *echo_node;
+ test_node_t *deny_node;
+ test_node_t *sink_node;
+ test_node_t *source_node;
+ test_node_t *source_ps_node;
+ test_node_t *discard_node;
+};
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+static void source_send(test_endpoint_t *ep, bool presettled)
+{
+ static uint32_t sequence = 0;
+ static char stringbuf[100];
+ qdr_delivery_t *dlv;
+ qd_message_t *msg = qd_message();
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+
+ sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
+
+ qd_compose_start_list(field);
+ qd_compose_insert_bool(field, 0); // durable
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+ qd_compose_start_list(field);
+ qd_compose_insert_null(field); // message-id
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+ qd_compose_start_map(field);
+ qd_compose_insert_symbol(field, "sequence");
+ qd_compose_insert_uint(field, sequence++);
+ qd_compose_end_map(field);
+
+ field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ qd_compose_insert_string(field, stringbuf);
+
+ dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
+ qd_message_compose_2(msg, field);
+ qd_compose_free(field);
+ qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
+
+ if (--ep->credit > 0) {
+ qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
+ action->args.general.context_1 = (void*) ep;
+ ep->in_action_list = true;
+ qdr_action_enqueue(ep->node->core, action);
+ }
+}
+
+
+static void free_endpoint(test_endpoint_t *ep)
+{
+ test_node_t *node = ep->node;
+
+ if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+ DEQ_REMOVE(node->in_links, ep);
+ else
+ DEQ_REMOVE(node->out_links, ep);
+ free(ep);
+}
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
+
+ ep->in_action_list = false;
+ if (ep->detached) {
+ free_endpoint(ep);
+ return;
+ }
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SINK :
+ case TEST_NODE_DISCARD :
+
+ case TEST_NODE_SOURCE :
+ source_send(ep, false);
+ break;
+
+ case TEST_NODE_SOURCE_PS :
+ source_send(ep, true);
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+ }
+}
+
+
+static bool first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_error_t **error)
+{
+ test_node_t *node = (test_node_t*) bind_context;
+ test_endpoint_t *test_ep = 0;
+ bool incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+
+ switch (node->behavior) {
+ case TEST_NODE_DENY :
+ *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+ return false;
+
+ case TEST_NODE_ECHO :
+ break;
+
+ case TEST_NODE_SINK :
+ if (incoming) {
+ qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+ } else {
+ *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
+ return false;
+ }
+ break;
+
+ case TEST_NODE_SOURCE :
+ case TEST_NODE_SOURCE_PS :
+ if (incoming) {
+ *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
+ return false;
+ }
+ break;
+
+ case TEST_NODE_DISCARD :
+ if (incoming) {
+ qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+ } else {
+ *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
+ return false;
+ }
+ break;
+ }
+
+ test_ep = NEW(test_endpoint_t);
+ ZERO(test_ep);
+ test_ep->node = node;
+ test_ep->ep = endpoint;
+ *link_context = test_ep;
+
+ if (incoming)
+ DEQ_INSERT_TAIL(node->in_links, test_ep);
+ else
+ DEQ_INSERT_TAIL(node->out_links, test_ep);
+
+ return true;
+}
+
+
+static void second_attach(void *link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+}
+
+
+static void flow(void *link_context,
+ int available_credit,
+ bool drain)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+ if (available_credit == 0)
+ return;
+
+ ep->credit = available_credit;
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SINK :
+ case TEST_NODE_DISCARD :
+ break;
+
+ case TEST_NODE_SOURCE :
+ source_send(ep, false);
+ break;
+
+ case TEST_NODE_SOURCE_PS :
+ source_send(ep, true);
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+ }
+}
+
+
+static void update(void *link_context,
+ qdr_delivery_t *delivery,
+ bool settled,
+ uint64_t disposition)
+{
+}
+
+
+static void transfer(void *link_context,
+ qdr_delivery_t *delivery,
+ qd_message_t *message)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (!qd_message_receive_complete(message))
+ return;
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SOURCE :
+ case TEST_NODE_SOURCE_PS :
+ assert(false); // Can't get here. Link should not have been opened
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+
+ case TEST_NODE_SINK :
+ qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
+ qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+ break;
+
+ case TEST_NODE_DISCARD :
+ qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
+ qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+ break;
+ }
+}
+
+
+static void detach(void *link_context,
+ qdr_error_t *error)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (ep->in_action_list) {
+ ep->detached = true;
+ } else {
+ free_endpoint(ep);
+ }
+}
+
+
+static void cleanup(void *link_context)
+{
+}
+
+
+static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+
+
+static test_module_t *qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
+{
+ char *echo_address = "org.apache.qpid.dispatch.router/test/echo";
+ char *deny_address = "org.apache.qpid.dispatch.router/test/deny";
+ char *sink_address = "org.apache.qpid.dispatch.router/test/sink";
+ char *source_address = "org.apache.qpid.dispatch.router/test/source";
+ char *source_ps_address = "org.apache.qpid.dispatch.router/test/source_ps";
+ char *discard_address = "org.apache.qpid.dispatch.router/test/discard";
+
+ test_module_t *module = NEW(test_module_t);
+
+ module->core = core;
+ module->echo_node = NEW(test_node_t);
+ module->deny_node = NEW(test_node_t);
+ module->sink_node = NEW(test_node_t);
+ module->source_node = NEW(test_node_t);
+ module->source_ps_node = NEW(test_node_t);
+ module->discard_node = NEW(test_node_t);
+
+ module->echo_node->core = core;
+ module->echo_node->module = module;
+ module->echo_node->behavior = TEST_NODE_ECHO;
+ module->echo_node->desc = &descriptor;
+ DEQ_INIT(module->echo_node->in_links);
+ DEQ_INIT(module->echo_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, module->echo_node);
+
+ module->deny_node->core = core;
+ module->deny_node->module = module;
+ module->deny_node->behavior = TEST_NODE_DENY;
+ module->deny_node->desc = &descriptor;
+ DEQ_INIT(module->deny_node->in_links);
+ DEQ_INIT(module->deny_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, module->deny_node);
+
+ module->sink_node->core = core;
+ module->sink_node->module = module;
+ module->sink_node->behavior = TEST_NODE_SINK;
+ module->sink_node->desc = &descriptor;
+ DEQ_INIT(module->sink_node->in_links);
+ DEQ_INIT(module->sink_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, module->sink_node);
+
+ module->source_node->core = core;
+ module->source_node->module = module;
+ module->source_node->behavior = TEST_NODE_SOURCE;
+ module->source_node->desc = &descriptor;
+ DEQ_INIT(module->source_node->in_links);
+ DEQ_INIT(module->source_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, module->source_node);
+
+ module->source_ps_node->core = core;
+ module->source_ps_node->module = module;
+ module->source_ps_node->behavior = TEST_NODE_SOURCE_PS;
+ module->source_ps_node->desc = &descriptor;
+ DEQ_INIT(module->source_ps_node->in_links);
+ DEQ_INIT(module->source_ps_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, module->source_ps_node);
+
+ module->discard_node->core = core;
+ module->discard_node->module = module;
+ module->discard_node->behavior = TEST_NODE_DISCARD;
+ module->discard_node->desc = &descriptor;
+ DEQ_INIT(module->discard_node->in_links);
+ DEQ_INIT(module->discard_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, module->discard_node);
+
+ return module;
+}
+
+
+static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module)
+{
+ free(module->echo_node);
+ free(module->deny_node);
+ free(module->sink_node);
+ free(module->source_node);
+ free(module->source_ps_node);
+ free(module->discard_node);
+}
+
+
+static void qdrc_test_hooks_init_CT(qdr_core_t *core, void **module_context)
+{
+ //
+ // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+ //
+ if (!core->qd->test_hooks) {
+ *module_context = 0;
+ return;
+ }
+
+ *module_context = qdrc_test_hooks_core_endpoint_setup(core);
+}
+
+
+static void qdrc_test_hooks_final_CT(void *module_context)
+{
+ if (!!module_context)
+ qdrc_test_hooks_core_endpoint_finalize(module_context);
+}
+
+
+QDR_CORE_MODULE_DECLARE("core_test_hooks", qdrc_test_hooks_init_CT, qdrc_test_hooks_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fac8f7af/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index ae6bbdc..11525d1 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -18,7 +18,7 @@
*/
#include "router_core_private.h"
-#include "core_test_hooks.h"
+#include "module.h"
/**
* Creates a thread that is dedicated to managing and using the routing table.
@@ -31,6 +31,28 @@
ALLOC_DEFINE(qdr_action_t);
+typedef struct qdrc_module_t {
+ DEQ_LINKS(struct qdrc_module_t);
+ const char *name;
+ qdrc_module_init_t on_init;
+ qdrc_module_final_t on_final;
+ void *context;
+} qdrc_module_t;
+
+DEQ_DECLARE(qdrc_module_t, qdrc_module_list_t);
+static qdrc_module_list_t registered_modules = {0,0};
+
+void qdr_register_core_module(const char *name, qdrc_module_init_t on_init, qdrc_module_final_t on_final)
+{
+ qdrc_module_t *module = NEW(qdrc_module_t);
+ ZERO(module);
+ module->name = name;
+ module->on_init = on_init;
+ module->on_final = on_final;
+ DEQ_INSERT_TAIL(registered_modules, module);
+}
+
+
static void qdr_activate_connections_CT(qdr_core_t *core)
{
qdr_connection_t *conn = DEQ_HEAD(core->connections_to_activate);
@@ -52,7 +74,16 @@ void *router_core_thread(void *arg)
qdr_forwarder_setup_CT(core);
qdr_route_table_setup_CT(core);
qdr_agent_setup_CT(core);
- qdrc_test_hooks_init_CT(core);
+
+ //
+ // Initialize registered modules
+ //
+ qdrc_module_t *module = DEQ_HEAD(registered_modules);
+ while (module) {
+ qd_log(core->log, QD_LOG_INFO, "Initializing core module: %s", module->name);
+ module->on_init(core, &module->context);
+ module = DEQ_NEXT(module);
+ }
qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
while (core->running) {
@@ -93,7 +124,16 @@ void *router_core_thread(void *arg)
qdr_activate_connections_CT(core);
}
- qdrc_test_hooks_final_CT(core);
+ //
+ // Finalize registered modules
+ //
+ module = DEQ_TAIL(registered_modules);
+ while (module) {
+ qd_log(core->log, QD_LOG_INFO, "Finalizing core module: %s", module->name);
+ module->on_final(module->context);
+ module = DEQ_PREV(module);
+ }
+
qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org