You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/25 13:57:15 UTC
qpid-dispatch git commit: DISPATCH-1123 - Completed core-endpoint API,
added internal test endpoints enabled from the command line, added tests.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 12692a781 -> d5bbd422c
DISPATCH-1123 - Completed core-endpoint API, added internal test endpoints enabled from the command line, added tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d5bbd422
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d5bbd422
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d5bbd422
Branch: refs/heads/master
Commit: d5bbd422ce040267e72b97ef8a36974dd57d3f4a
Parents: 12692a7
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Sep 25 09:45:12 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 25 09:45:12 2018 -0400
----------------------------------------------------------------------
include/qpid/dispatch/dispatch.h | 4 +-
router/src/main.c | 21 +-
src/CMakeLists.txt | 1 +
src/dispatch.c | 3 +-
src/dispatch_private.h | 1 +
src/router_core/connections.c | 160 +++++------
src/router_core/core_link_endpoint.c | 59 ++++-
src/router_core/core_link_endpoint.h | 128 +++++----
src/router_core/core_test_hooks.c | 408 +++++++++++++++++++++++++++++
src/router_core/core_test_hooks.h | 28 ++
src/router_core/router_core_private.h | 10 +
src/router_core/router_core_thread.c | 3 +
src/router_core/transfer.c | 19 +-
tests/CMakeLists.txt | 1 +
tests/run_unit_tests.c | 2 +-
tests/system_test.py | 4 +-
tests/system_tests_core_endpoint.py | 231 ++++++++++++++++
17 files changed, 942 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/include/qpid/dispatch/dispatch.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/dispatch.h b/include/qpid/dispatch/dispatch.h
index 1e8b360..3b8f79e 100644
--- a/include/qpid/dispatch/dispatch.h
+++ b/include/qpid/dispatch/dispatch.h
@@ -19,6 +19,7 @@
* under the License.
*/
+#include <stdbool.h>
#include <qpid/dispatch/error.h>
/**@file
@@ -34,9 +35,10 @@ typedef struct qd_dispatch_t qd_dispatch_t;
* Initialize the Dispatch library and prepare it for operation.
*
* @param python_pkgdir The path to the Python files.
+ * @param test_hooks Iff true, enable internal system testing features
* @return A handle to be used in API calls for this instance.
*/
-qd_dispatch_t *qd_dispatch(const char *python_pkgdir);
+qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks);
/**
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/router/src/main.c
----------------------------------------------------------------------
diff --git a/router/src/main.c b/router/src/main.c
index c66ba4f..3b7eecd 100644
--- a/router/src/main.c
+++ b/router/src/main.c
@@ -84,9 +84,9 @@ static void check(int fd) {
check(fd); \
} while(false)
-static void main_process(const char *config_path, const char *python_pkgdir, int fd)
+static void main_process(const char *config_path, const char *python_pkgdir, bool test_hooks, int fd)
{
- dispatch = qd_dispatch(python_pkgdir);
+ dispatch = qd_dispatch(python_pkgdir, test_hooks);
check(fd);
log_source = qd_log_source("MAIN"); /* Logging is initialized by qd_dispatch. */
qd_dispatch_validate_config(config_path);
@@ -123,7 +123,7 @@ static void main_process(const char *config_path, const char *python_pkgdir, int
}
-static void daemon_process(const char *config_path, const char *python_pkgdir,
+static void daemon_process(const char *config_path, const char *python_pkgdir, bool test_hooks,
const char *pidfile, const char *user)
{
int pipefd[2];
@@ -243,7 +243,7 @@ static void daemon_process(const char *config_path, const char *python_pkgdir,
//if (setgid(pwd->pw_gid) < 0) fail(pipefd[1], "Can't set group ID for user %s, errno=%d", user, errno);
}
- main_process((config_path_full ? config_path_full : config_path), python_pkgdir, pipefd[1]);
+ main_process((config_path_full ? config_path_full : config_path), python_pkgdir, test_hooks, pipefd[1]);
free(config_path_full);
} else
@@ -283,6 +283,7 @@ void usage(char **argv) {
fprintf(stdout, " -d, --daemon Run process as a SysV-style daemon\n");
fprintf(stdout, " -P, --pidfile If daemon, the file for the stored daemon pid\n");
fprintf(stdout, " -U, --user If daemon, the username to run as\n");
+ fprintf(stdout, " -T, --test-hooks Enable internal system testing features\n");
fprintf(stdout, " -v, --version Print the version of Qpid Dispatch Router\n");
fprintf(stdout, " -h, --help Print this help\n");
}
@@ -295,6 +296,7 @@ int main(int argc, char **argv)
const char *pidfile = 0;
const char *user = 0;
bool daemon_mode = false;
+ bool test_hooks = false;
static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
@@ -304,11 +306,12 @@ int main(int argc, char **argv)
{"user", required_argument, 0, 'U'},
{"help", no_argument, 0, 'h'},
{"version", no_argument, 0, 'v'},
+ {"test-hooks", no_argument, 0, 'T'},
{0, 0, 0, 0}
};
while (1) {
- int c = getopt_long(argc, argv, "c:I:dP:U:h:v", long_options, 0);
+ int c = getopt_long(argc, argv, "c:I:dP:U:h:vT", long_options, 0);
if (c == -1)
break;
@@ -341,6 +344,10 @@ int main(int argc, char **argv)
fprintf(stdout, "%s\n", QPID_DISPATCH_VERSION);
exit(0);
+ case 'T' :
+ test_hooks = true;
+ break;
+
case '?' :
usage(argv);
exit(1);
@@ -355,9 +362,9 @@ int main(int argc, char **argv)
}
if (daemon_mode)
- daemon_process(config_path, python_pkgdir, pidfile, user);
+ daemon_process(config_path, python_pkgdir, test_hooks, pidfile, user);
else
- main_process(config_path, python_pkgdir, 2);
+ main_process(config_path, python_pkgdir, test_hooks, 2);
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c533e9f..c3d35b9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -85,6 +85,7 @@ set(qpid_dispatch_SOURCES
router_core/agent_router.c
router_core/connections.c
router_core/core_link_endpoint.c
+ router_core/core_test_hooks.c
router_core/edge_control.c
router_core/error.c
router_core/exchange_bindings.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 2966bf3..9fb2186 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -58,7 +58,7 @@ const char *MULTICAST_DISTRIBUTION = "multicast";
const char *BALANCED_DISTRIBUTION = "balanced";
const char *UNAVAILABLE_DISTRIBUTION = "unavailable";
-qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
+qd_dispatch_t *qd_dispatch(const char *python_pkgdir, bool test_hooks)
{
qd_dispatch_t *qd = NEW(qd_dispatch_t);
ZERO(qd);
@@ -84,6 +84,7 @@ qd_dispatch_t *qd_dispatch(const char *python_pkgdir)
qd_dispatch_set_router_id(qd, strdup("0"));
qd->router_mode = QD_ROUTER_MODE_ENDPOINT;
qd->default_treatment = QD_TREATMENT_LINK_BALANCED;
+ qd->test_hooks = test_hooks;
qd_python_initialize(qd, python_pkgdir);
if (qd_error_code()) { qd_dispatch_free(qd); return 0; }
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/dispatch_private.h
----------------------------------------------------------------------
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index d927023..36065dc 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -57,6 +57,7 @@ struct qd_dispatch_t {
char *router_id;
qd_router_mode_t router_mode;
bool allow_resumable_link_route;
+ bool test_hooks;
};
/**
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9df65d1..d51dd39 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -816,6 +816,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
DEQ_REMOVE(core->open_links, link);
//
+ // If the link has a core_endpoint, allow the core_endpoint module to
+ // clean up its state
+ //
+ if (link->core_endpoint)
+ qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);
+
+ //
// If the link has a connected peer, unlink the peer
//
if (link->connected_link) {
@@ -1890,96 +1897,101 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
//
link->detach_count++;
- //
- // For routed links, propagate the detach
- //
- if (link->connected_link) {
+ if (link->core_endpoint) {
+ qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
+
+ } else {
//
- // If the connected link is outgoing and there is a delivery on the connected link's undelivered
- // list that is not receive-complete, we must flag that delivery as aborted or it will forever
- // block the propagation of the detach.
+ // For routed links, propagate the detach
//
- if (link->connected_link->link_direction == QD_OUTGOING)
- qdr_link_abort_undelivered_CT(core, link->connected_link);
+ if (link->connected_link) {
+ //
+ // If the connected link is outgoing and there is a delivery on the connected link's undelivered
+ // list that is not receive-complete, we must flag that delivery as aborted or it will forever
+ // block the propagation of the detach.
+ //
+ if (link->connected_link->link_direction == QD_OUTGOING)
+ qdr_link_abort_undelivered_CT(core, link->connected_link);
- if (dt != QD_LOST)
- qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
- else {
- qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
- qdr_error_free(error);
+ if (dt != QD_LOST)
+ qdr_link_outbound_detach_CT(core, link->connected_link, error, QDR_CONDITION_NONE, dt == QD_CLOSED);
+ else {
+ qdr_link_outbound_detach_CT(core, link->connected_link, 0, QDR_CONDITION_ROUTED_LINK_LOST, !link->terminus_survives_disconnect);
+ qdr_error_free(error);
+ }
+
+ //
+ // If the link is completely detached, release its resources
+ //
+ if (link->detach_count == 2) {
+ qdr_link_cleanup_CT(core, conn, link);
+ free_qdr_link_t(link);
+ }
+
+ return;
}
//
- // If the link is completely detached, release its resources
+ // For auto links, switch the auto link to failed state and record the error
//
- if (link->detach_count == 2) {
- qdr_link_cleanup_CT(core, conn, link);
- free_qdr_link_t(link);
- }
+ if (link->auto_link) {
+ link->auto_link->link = 0;
+ link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
+ free(link->auto_link->last_error);
+ link->auto_link->last_error = qdr_error_description(error);
- return;
- }
+ //
+ // The auto link has failed. Periodically retry setting up the auto link until
+ // it succeeds.
+ //
+ qdr_route_auto_link_detached_CT(core, link);
+ }
- //
- // For auto links, switch the auto link to failed state and record the error
- //
- if (link->auto_link) {
- link->auto_link->link = 0;
- link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
- free(link->auto_link->last_error);
- link->auto_link->last_error = qdr_error_description(error);
+ if (link->link_direction == QD_INCOMING) {
+ //
+ // Handle incoming link cases
+ //
+ switch (link->link_type) {
+ case QD_LINK_ENDPOINT:
+ if (addr) {
+ //
+ // Drain the undelivered list to ensure deliveries don't get dropped by a detach.
+ //
+ qdr_drain_inbound_undelivered_CT(core, link, addr);
- //
- // The auto link has failed. Periodically retry setting up the auto link until
- // it succeeds.
- //
- qdr_route_auto_link_detached_CT(core, link);
- }
+ //
+ // Remove this link from the linked address.
+ //
+ qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+ }
+ break;
- if (link->link_direction == QD_INCOMING) {
- //
- // Handle incoming link cases
- //
- switch (link->link_type) {
- case QD_LINK_ENDPOINT:
- if (addr) {
- //
- // Drain the undelivered list to ensure deliveries don't get dropped by a detach.
- //
- qdr_drain_inbound_undelivered_CT(core, link, addr);
+ case QD_LINK_CONTROL:
+ break;
- //
- // Remove this link from the linked address.
- //
- qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+ case QD_LINK_ROUTER:
+ break;
}
- break;
+ } else {
+ //
+ // Handle outgoing link cases
+ //
+ switch (link->link_type) {
+ case QD_LINK_ENDPOINT:
+ if (addr) {
+ qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
+ was_local = true;
+ }
+ break;
- case QD_LINK_CONTROL:
- break;
+ case QD_LINK_CONTROL:
+ qdr_detach_link_control_CT(core, conn, link);
+ break;
- case QD_LINK_ROUTER:
- break;
- }
- } else {
- //
- // Handle outgoing link cases
- //
- switch (link->link_type) {
- case QD_LINK_ENDPOINT:
- if (addr) {
- qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
- was_local = true;
+ case QD_LINK_ROUTER:
+ qdr_detach_link_data_CT(core, conn, link);
+ break;
}
- break;
-
- case QD_LINK_CONTROL:
- qdr_detach_link_control_CT(core, conn, link);
- break;
-
- case QD_LINK_ROUTER:
- qdr_detach_link_data_CT(core, conn, link);
- break;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_link_endpoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index b8b6edc..961c219 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -19,6 +19,7 @@
#include "core_link_endpoint.h"
#include "qpid/dispatch/alloc.h"
+#include <stdio.h>
struct qdrc_endpoint_t {
qdrc_endpoint_desc_t *desc;
@@ -40,7 +41,7 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core,
qd_iterator_annotate_phase(iter, phase);
qd_hash_retrieve(core->addr_hash, iter, (void*) &addr);
- if (addr) {
+ if (!addr) {
qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0);
if (treatment == QD_TREATMENT_UNAVAILABLE)
treatment = QD_TREATMENT_ANYCAST_BALANCED;
@@ -110,6 +111,21 @@ void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t
}
+qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qd_message_t *message)
+{
+ qdr_delivery_t *dlv = new_qdr_delivery_t();
+ uint64_t *tag = (uint64_t*) dlv->tag;
+
+ ZERO(dlv);
+ dlv->link = endpoint->link;
+ dlv->msg = message;
+ *tag = core->next_tag++;
+ dlv->tag_length = 8;
+ dlv->ingress_index = -1;
+ return dlv;
+}
+
+
void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t disposition)
{
//
@@ -133,6 +149,10 @@ void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *dlv, uint64_t dis
void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
{
qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true);
+ if (ep->link->detach_count == 2) {
+ ep->link->core_endpoint = 0;
+ free_qdrc_endpoint_t(ep);
+ }
}
@@ -143,13 +163,48 @@ bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr
ep->desc = addr->core_endpoint;
ep->link = link;
+ link->core_endpoint = ep;
+
*error = 0;
bool accept = !!ep->desc->on_first_attach ?
ep->desc->on_first_attach(addr->core_endpoint_context, ep, &ep->link_context, error) : false;
- if (!accept)
+ if (!accept) {
+ link->core_endpoint = 0;
free_qdrc_endpoint_t(ep);
+ }
return accept;
}
+
+
+void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv)
+{
+ ep->desc->on_transfer(ep->link_context, dlv, dlv->msg);
+}
+
+
+void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, bool drain)
+{
+ ep->desc->on_flow(ep->link_context, credit, drain);
+}
+
+
+void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error)
+{
+ ep->desc->on_detach(ep->link_context, error);
+ if (ep->link->detach_count == 2) {
+ ep->link->core_endpoint = 0;
+ free_qdrc_endpoint_t(ep);
+ }
+}
+
+
+void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *ep)
+{
+ ep->desc->on_cleanup(ep->link_context);
+ free_qdrc_endpoint_t(ep);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_link_endpoint.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h
index 1bfbb3c..4e9c2a0 100644
--- a/src/router_core/core_link_endpoint.h
+++ b/src/router_core/core_link_endpoint.h
@@ -32,12 +32,12 @@ typedef struct qdrc_endpoint_t qdrc_endpoint_t;
/**
* Event - An attach for a new core-endpoint link has arrived
*
- * @Param bind_context The opaque context provided in the mobile address binding
- * @Param endpoint A new endpoint object for the link. If the link is accepted, this
+ * @param bind_context The opaque context provided in the mobile address binding
+ * @param endpoint A new endpoint object for the link. If the link is accepted, this
* object must be stored for later use.
- * @Param link_context [out] Handler-provided opaque context to be associated with the endpoint
- * @Param error [out] Error indication which may be supplied if the link is rejected
- * @Return True if the link is to be accepted, False if the link should be rejected and detached
+ * @param link_context [out] Handler-provided opaque context to be associated with the endpoint
+ * @param error [out] Error indication which may be supplied if the link is rejected
+ * @return True if the link is to be accepted, False if the link should be rejected and detached
*/
typedef bool (*qdrc_first_attach_t) (void *bind_context,
qdrc_endpoint_t *endpoint,
@@ -51,16 +51,20 @@ typedef bool (*qdrc_first_attach_t) (void *bind_context,
* is the responsibility of the core-endpoint to supply credit at the appropriate time
* by calling qdrc_endpoint_flow_CT.
*
- * @Param link_context The opaque context supplied in the call to qdrc_endpoint_create_link_CT
+ * @param link_context The opaque context supplied in the call to qdrc_endpoint_create_link_CT
+ * @param remote_source Pointer to the remote source terminus of the link
+ * @param remote_target Pointer to the remote target terminus of the link
*/
-typedef bool (*qdrc_second_attach_t) (void *link_context);
+typedef void (*qdrc_second_attach_t) (void *link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target);
/**
* Event - Credit/Drain status for an outgoing core-endpoint link has changed
*
- * @Param link_context The opaque context associated with the endpoint link
- * @Param available_credit The number of deliveries that may be sent on this link
- * @Param drain True iff the peer receiver is requesting that the credit be drained
+ * @param link_context The opaque context associated with the endpoint link
+ * @param available_credit The number of deliveries that may be sent on this link
+ * @param drain True iff the peer receiver is requesting that the credit be drained
*/
typedef void (*qdrc_flow_t) (void *link_context,
int available_credit,
@@ -69,10 +73,10 @@ typedef void (*qdrc_flow_t) (void *link_context,
/**
* Event - The settlement and/or disposition of a delivery has been updated
*
- * @Param link_context The opaque context associated with the endpoint link
- * @Param delivery The delivery object experiencing the change
- * @Param settled True iff the delivery has been settled by the peer
- * @Param disposition The disposition of the delivery (PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
+ * @param link_context The opaque context associated with the endpoint link
+ * @param delivery The delivery object experiencing the change
+ * @param settled True iff the delivery has been settled by the peer
+ * @param disposition The disposition of the delivery (PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
*/
typedef void (*qdrc_update_t) (void *link_context,
qdr_delivery_t *delivery,
@@ -86,9 +90,9 @@ typedef void (*qdrc_update_t) (void *link_context,
* required, this handler _must_ use the qd_message_receive_complete method on the
* message to ensure the message has been completely received.
*
- * @Param link_context The opaque context associated with the endpoint link
- * @Param delivery Pointer to the delivery object for the transfer
- * @Param message Pointer to the message being transferred
+ * @param link_context The opaque context associated with the endpoint link
+ * @param delivery Pointer to the delivery object for the transfer
+ * @param message Pointer to the message being transferred
*/
typedef void (*qdrc_transfer_t) (void *link_context,
qdr_delivery_t *delivery,
@@ -97,13 +101,26 @@ typedef void (*qdrc_transfer_t) (void *link_context,
/**
* Event - A core-endpoint link has been detached
*
- * @Param link_context The opaque context associated with the endpoint link
- * @Param error The error information that came with the detach or 0 if no error
+ * Note: It is safe to discard objects referenced by the link_context in this handler.
+ * There will be no further references to this link_context returned after this call.
+ *
+ * @param link_context The opaque context associated with the endpoint link
+ * @param error The error information that came with the detach or 0 if no error
*/
typedef void (*qdrc_detach_t) (void *link_context,
qdr_error_t *error);
+/**
+ * Event - A core-endpoint link has been cleaned up (not cleanly detached)
+ *
+ * This handler must free all resources associated with the link-context.
+ *
+ * @param link_context The opaque context associated with the endpoint link
+ */
+typedef void (*qdrc_cleanup_t) (void *link_context);
+
+
typedef struct qdrc_endpoint_desc_t {
qdrc_first_attach_t on_first_attach;
qdrc_second_attach_t on_second_attach;
@@ -111,6 +128,7 @@ typedef struct qdrc_endpoint_desc_t {
qdrc_update_t on_update;
qdrc_transfer_t on_transfer;
qdrc_detach_t on_detach;
+ qdrc_cleanup_t on_cleanup;
} qdrc_endpoint_desc_t;
@@ -121,11 +139,11 @@ typedef struct qdrc_endpoint_desc_t {
* to the core-endpoint. Any incoming links with terminus addresses that match
* this address will be directed to the core-endpoint for handling.
*
- * @Param core Pointer to the core object
- * @Param address The address as a null-terminated character string
- * @Param phase The phase of the address (typically '0')
- * @Param desc The descriptor for this core endpoint containing all callbacks
- * @Param bind_context An opaque context that will be included in the call to on_first_attach
+ * @param core Pointer to the core object
+ * @param address The address as a null-terminated character string
+ * @param phase The phase of the address (typically '0')
+ * @param desc The descriptor for this core endpoint containing all callbacks
+ * @param bind_context An opaque context that will be included in the call to on_first_attach
*/
void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core,
const char *address,
@@ -140,14 +158,14 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core,
* Initiate the attachment of a new link outbound to a remote node. The link will
* be known to be fully attached when the on_second_attach callback is invoked.
*
- * @Param core Pointer to the core object
- * @Param conn Pointer to the connection object over which the link will be created
- * @Param dir The direction of the link: QD_INCOMING or QD_OUTGOING
- * @Param local_source The source terminus of the link - must be included for incoming links
- * @Param local_target The target terminus of the link - must be included for outgoing links
- * @Param desc The descriptor for this core endpoint containing all callbacks
- * @Param link_context An opaque context that will be included in the calls to the callbacks
- * @Return Pointer to a new qdrc_endpoint_t for tracking the link
+ * @param core Pointer to the core object
+ * @param conn Pointer to the connection object over which the link will be created
+ * @param dir The direction of the link: QD_INCOMING or QD_OUTGOING
+ * @param local_source The source terminus of the link - must be included for incoming links
+ * @param local_target The target terminus of the link - must be included for outgoing links
+ * @param desc The descriptor for this core endpoint containing all callbacks
+ * @param link_context An opaque context that will be included in the calls to the callbacks
+ * @return Pointer to a new qdrc_endpoint_t for tracking the link
*/
qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
qdr_connection_t *conn,
@@ -163,8 +181,8 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
* - The link's direction of delivery flow
* - The link's connection
*
- * @Param endpoint Pointer to an endpoint object
- * @Return The requested information (or 0 if not present)
+ * @param endpoint Pointer to an endpoint object
+ * @return The requested information (or 0 if not present)
*/
qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint);
qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
@@ -172,40 +190,50 @@ qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint);
/**
* Issue credit and control drain for an incoming link
*
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param credit_added Number of credits to grant to the sender
- * @Param drain Indication that you want the sender to drain available credit
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param credit_added Number of credits to grant to the sender
+ * @param drain Indication that you want the sender to drain available credit
*/
void qdrc_endpoint_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit_added, bool drain);
/**
* Send a message via an outgoing link
*
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param delivery A delivery containing a message that is to be sent on the link
- * @Param presettled True iff the delivery is to be presettled. If presettled, no further action
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param delivery A delivery containing a message that is to be sent on the link
+ * @param presettled True iff the delivery is to be presettled. If presettled, no further action
* will be needed for the delivery. If not presettled, an on_update event for
* the delivery should be expected.
*/
void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery, bool presettled);
/**
- * Settle a received delivery with a specified disposition
+ * Allocate a delivery from a given endpoint
*
* @Param core Pointer to the core object
- * @Param delivery Pointer to a received delivery
- * @Param disposition The desired disposision of the delivery (use PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
+ * @Param endpoint Pointer to an endpoint object
+ * @Param message An outgoing message to be associated with the delivery
+ * @Return A delivery that can be passed to qdrc_endpoint_send_CT
+ */
+qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qd_message_t *message);
+
+/**
+ * Settle a received delivery with a specified disposition
+ *
+ * @param core Pointer to the core object
+ * @param delivery Pointer to a received delivery
+ * @param disposition The desired disposision of the delivery (use PN_[ACCEPTED|REJECTED|MODIFIED|RELEASED])
*/
void qdrc_endpoint_settle_CT(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition);
/**
* Detach a link attached to the core-endpoint
*
- * @Param core Pointer to the core object
- * @Param endpoint Pointer to an endpoint object
- * @Param error An error indication or 0 for no error
+ * @param core Pointer to the core object
+ * @param endpoint Pointer to an endpoint object
+ * @param error An error indication or 0 for no error
*/
void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
@@ -215,5 +243,9 @@ void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_er
//=====================================================================================
bool qdrc_endpoint_do_bound_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link, qdr_error_t **error);
+void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery);
+void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain);
+void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error);
+void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.c b/src/router_core/core_test_hooks.c
new file mode 100644
index 0000000..05cb75d
--- /dev/null
+++ b/src/router_core/core_test_hooks.c
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "qpid/dispatch/message.h"
+#include "qpid/dispatch/compose.h"
+#include "core_test_hooks.h"
+#include "core_link_endpoint.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+typedef enum {
+ TEST_NODE_ECHO,
+ TEST_NODE_DENY,
+ TEST_NODE_SINK,
+ TEST_NODE_SOURCE,
+ TEST_NODE_SOURCE_PS,
+ TEST_NODE_DISCARD
+} test_node_behavior_t;
+
+typedef struct test_node_t test_node_t;
+
+typedef struct test_endpoint_t {
+ DEQ_LINKS(struct test_endpoint_t);
+ test_node_t *node;
+ qdrc_endpoint_t *ep;
+ qdr_delivery_list_t deliveries;
+ int credit;
+ bool in_action_list;
+ bool detached;
+} test_endpoint_t;
+
+DEQ_DECLARE(test_endpoint_t, test_endpoint_list_t);
+
+struct test_node_t {
+ qdr_core_t *core;
+ test_node_behavior_t behavior;
+ qdrc_endpoint_desc_t *desc;
+ test_endpoint_list_t in_links;
+ test_endpoint_list_t out_links;
+};
+
+static test_node_t *echo_node;
+static test_node_t *deny_node;
+static test_node_t *sink_node;
+static test_node_t *source_node;
+static test_node_t *source_ps_node;
+static test_node_t *discard_node;
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+static void source_send(test_endpoint_t *ep, bool presettled)
+{
+ static uint32_t sequence = 0;
+ static char stringbuf[100];
+ qdr_delivery_t *dlv;
+ qd_message_t *msg = qd_message();
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+
+ sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
+
+ qd_compose_start_list(field);
+ qd_compose_insert_bool(field, 0); // durable
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+ qd_compose_start_list(field);
+ qd_compose_insert_null(field); // message-id
+ qd_compose_end_list(field);
+
+ field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
+ qd_compose_start_map(field);
+ qd_compose_insert_symbol(field, "sequence");
+ qd_compose_insert_uint(field, sequence++);
+ qd_compose_end_map(field);
+
+ field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+ qd_compose_insert_string(field, stringbuf);
+
+ dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
+ qd_message_compose_2(msg, field);
+ qd_compose_free(field);
+ qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
+
+ if (--ep->credit > 0) {
+ qdr_action_t *action = qdr_action(endpoint_action, "test_hooks_endpoint_action");
+ action->args.general.context_1 = (void*) ep;
+ ep->in_action_list = true;
+ qdr_action_enqueue(ep->node->core, action);
+ }
+}
+
+
+static void free_endpoint(test_endpoint_t *ep)
+{
+ test_node_t *node = ep->node;
+
+ if (qdrc_endpoint_get_direction_CT(ep->ep) == QD_INCOMING)
+ DEQ_REMOVE(node->in_links, ep);
+ else
+ DEQ_REMOVE(node->out_links, ep);
+ free(ep);
+}
+
+
+static void endpoint_action(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ test_endpoint_t *ep = (test_endpoint_t*) action->args.general.context_1;
+
+ ep->in_action_list = false;
+ if (ep->detached) {
+ free_endpoint(ep);
+ return;
+ }
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SINK :
+ case TEST_NODE_DISCARD :
+
+ case TEST_NODE_SOURCE :
+ source_send(ep, false);
+ break;
+
+ case TEST_NODE_SOURCE_PS :
+ source_send(ep, true);
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+ }
+}
+
+
+static bool first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_error_t **error)
+{
+ test_node_t *node = (test_node_t*) bind_context;
+ test_endpoint_t *test_ep = 0;
+ bool incoming = qdrc_endpoint_get_direction_CT(endpoint) == QD_INCOMING;
+
+ switch (node->behavior) {
+ case TEST_NODE_DENY :
+ *error = qdr_error("qd:forbidden", "Connectivity to the deny node is forbidden");
+ return false;
+
+ case TEST_NODE_ECHO :
+ break;
+
+ case TEST_NODE_SINK :
+ if (incoming) {
+ qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+ } else {
+ *error = qdr_error("qd:forbidden", "Sink function only accepts incoming links");
+ return false;
+ }
+ break;
+
+ case TEST_NODE_SOURCE :
+ case TEST_NODE_SOURCE_PS :
+ if (incoming) {
+ *error = qdr_error("qd:forbidden", "Source function only accepts outgoing links");
+ return false;
+ }
+ break;
+
+ case TEST_NODE_DISCARD :
+ if (incoming) {
+ qdrc_endpoint_flow_CT(node->core, endpoint, 1, false);
+ } else {
+ *error = qdr_error("qd:forbidden", "Discard function only accepts incoming links");
+ return false;
+ }
+ break;
+ }
+
+ test_ep = NEW(test_endpoint_t);
+ ZERO(test_ep);
+ test_ep->node = node;
+ test_ep->ep = endpoint;
+ *link_context = test_ep;
+
+ if (incoming)
+ DEQ_INSERT_TAIL(node->in_links, test_ep);
+ else
+ DEQ_INSERT_TAIL(node->out_links, test_ep);
+
+ return true;
+}
+
+
+static void second_attach(void *link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+}
+
+
+static void flow(void *link_context,
+ int available_credit,
+ bool drain)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+ if (available_credit == 0)
+ return;
+
+ ep->credit = available_credit;
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SINK :
+ case TEST_NODE_DISCARD :
+ break;
+
+ case TEST_NODE_SOURCE :
+ source_send(ep, false);
+ break;
+
+ case TEST_NODE_SOURCE_PS :
+ source_send(ep, true);
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+ }
+}
+
+
+static void update(void *link_context,
+ qdr_delivery_t *delivery,
+ bool settled,
+ uint64_t disposition)
+{
+}
+
+
+static void transfer(void *link_context,
+ qdr_delivery_t *delivery,
+ qd_message_t *message)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (!qd_message_receive_complete(message))
+ return;
+
+ switch (ep->node->behavior) {
+ case TEST_NODE_DENY :
+ case TEST_NODE_SOURCE :
+ case TEST_NODE_SOURCE_PS :
+ assert(false); // Can't get here. Link should not have been opened
+ break;
+
+ case TEST_NODE_ECHO :
+ break;
+
+ case TEST_NODE_SINK :
+ qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_ACCEPTED);
+ qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+ break;
+
+ case TEST_NODE_DISCARD :
+ qdrc_endpoint_settle_CT(ep->node->core, delivery, PN_REJECTED);
+ qdrc_endpoint_flow_CT(ep->node->core, ep->ep, 1, false);
+ break;
+ }
+}
+
+
+static void detach(void *link_context,
+ qdr_error_t *error)
+{
+ test_endpoint_t *ep = (test_endpoint_t*) link_context;
+
+ if (ep->in_action_list) {
+ ep->detached = true;
+ } else {
+ free_endpoint(ep);
+ }
+}
+
+
+static void cleanup(void *link_context)
+{
+}
+
+
+static qdrc_endpoint_desc_t descriptor = {first_attach, second_attach, flow, update, transfer, detach, cleanup};
+
+
+static void qdrc_test_hooks_core_endpoint_setup(qdr_core_t *core)
+{
+ char *echo_address = "org.apache.qpid.dispatch.router/test/echo";
+ char *deny_address = "org.apache.qpid.dispatch.router/test/deny";
+ char *sink_address = "org.apache.qpid.dispatch.router/test/sink";
+ char *source_address = "org.apache.qpid.dispatch.router/test/source";
+ char *source_ps_address = "org.apache.qpid.dispatch.router/test/source_ps";
+ char *discard_address = "org.apache.qpid.dispatch.router/test/discard";
+
+ echo_node = NEW(test_node_t);
+ deny_node = NEW(test_node_t);
+ sink_node = NEW(test_node_t);
+ source_node = NEW(test_node_t);
+ source_ps_node = NEW(test_node_t);
+ discard_node = NEW(test_node_t);
+
+ echo_node->core = core;
+ echo_node->behavior = TEST_NODE_ECHO;
+ echo_node->desc = &descriptor;
+ DEQ_INIT(echo_node->in_links);
+ DEQ_INIT(echo_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, echo_address, '0', &descriptor, echo_node);
+
+ deny_node->core = core;
+ deny_node->behavior = TEST_NODE_DENY;
+ deny_node->desc = &descriptor;
+ DEQ_INIT(deny_node->in_links);
+ DEQ_INIT(deny_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, deny_address, '0', &descriptor, deny_node);
+
+ sink_node->core = core;
+ sink_node->behavior = TEST_NODE_SINK;
+ sink_node->desc = &descriptor;
+ DEQ_INIT(sink_node->in_links);
+ DEQ_INIT(sink_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, sink_address, '0', &descriptor, sink_node);
+
+ source_node->core = core;
+ source_node->behavior = TEST_NODE_SOURCE;
+ source_node->desc = &descriptor;
+ DEQ_INIT(source_node->in_links);
+ DEQ_INIT(source_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, source_address, '0', &descriptor, source_node);
+
+ source_ps_node->core = core;
+ source_ps_node->behavior = TEST_NODE_SOURCE_PS;
+ source_ps_node->desc = &descriptor;
+ DEQ_INIT(source_ps_node->in_links);
+ DEQ_INIT(source_ps_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, source_ps_address, '0', &descriptor, source_ps_node);
+
+ discard_node->core = core;
+ discard_node->behavior = TEST_NODE_DISCARD;
+ discard_node->desc = &descriptor;
+ DEQ_INIT(discard_node->in_links);
+ DEQ_INIT(discard_node->out_links);
+ qdrc_endpoint_bind_mobile_address_CT(core, discard_address, '0', &descriptor, discard_node);
+}
+
+
+static void qdrc_test_hooks_core_endpoint_finalize(qdr_core_t *core)
+{
+ free(echo_node);
+ free(deny_node);
+ free(sink_node);
+ free(source_node);
+ free(source_ps_node);
+ free(discard_node);
+}
+
+
+void qdrc_test_hooks_init_CT(qdr_core_t *core)
+{
+ //
+ // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+ //
+ if (!core->qd->test_hooks)
+ return;
+
+ qd_log(core->log, QD_LOG_INFO, "Core thread system test hooks enabled");
+
+ qdrc_test_hooks_core_endpoint_setup(core);
+}
+
+
+void qdrc_test_hooks_final_CT(qdr_core_t *core)
+{
+ //
+ // Exit if the test hooks are not enabled (by the --test-hooks command line option)
+ //
+ if (!core->qd->test_hooks)
+ return;
+
+ qdrc_test_hooks_core_endpoint_finalize(core);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/core_test_hooks.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_test_hooks.h b/src/router_core/core_test_hooks.h
new file mode 100644
index 0000000..7be20f3
--- /dev/null
+++ b/src/router_core/core_test_hooks.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_test_hooks
+#define qd_router_core_test_hooks 1
+
+#include "router_core_private.h"
+
+void qdrc_test_hooks_init_CT(qdr_core_t *core);
+void qdrc_test_hooks_final_CT(qdr_core_t *core);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 13c7dc4..1a4450d 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -154,6 +154,16 @@ struct qdr_action_t {
qd_buffer_list_t body_buffers;
} agent;
+ //
+ // Arguments for general use
+ //
+ struct {
+ void *context_1;
+ void *context_2;
+ void *context_3;
+ void *context_4;
+ } general;
+
} args;
};
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index e5b00f5..ae6bbdc 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -18,6 +18,7 @@
*/
#include "router_core_private.h"
+#include "core_test_hooks.h"
/**
* Creates a thread that is dedicated to managing and using the routing table.
@@ -51,6 +52,7 @@ void *router_core_thread(void *arg)
qdr_forwarder_setup_CT(core);
qdr_route_table_setup_CT(core);
qdr_agent_setup_CT(core);
+ qdrc_test_hooks_init_CT(core);
qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id);
while (core->running) {
@@ -91,6 +93,7 @@ void *router_core_thread(void *arg)
qdr_activate_connections_CT(core);
}
+ qdrc_test_hooks_final_CT(core);
qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 68645ea..361e2e1 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -748,11 +748,13 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
activate = true;
}
- //
- // If this is an attach-routed link, propagate the flow data downrange.
- // Note that the credit value is incremental.
- //
- if (link->connected_link) {
+ if (link->core_endpoint) {
+ qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain);
+ } else if (link->connected_link) {
+ //
+ // If this is an attach-routed link, propagate the flow data downrange.
+ // Note that the credit value is incremental.
+ //
qdr_link_t *clink = link->connected_link;
if (clink->link_direction == QD_INCOMING)
@@ -983,6 +985,13 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
bool more = action->args.connection.more;
+ //
+ // If this link has a core_endpoint, direct deliveries to that endpoint.
+ //
+ if (!!link->core_endpoint) {
+ qdrc_endpoint_do_deliver_CT(core, link->core_endpoint, dlv);
+ return;
+ }
if (link->connected_link) {
if (link->link_direction == QD_INCOMING)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 4c2cf11..c54e9d4 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -118,6 +118,7 @@ foreach(py_test_module
system_tests_bad_configuration
system_tests_ssl
system_tests_edge_router
+ system_tests_core_endpoint
${SYSTEM_TESTS_HTTP}
${CONSOLE_TEST}
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index 3434a90..77b9845 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -41,7 +41,7 @@ int main(int argc, char** argv)
int result = 0;
// Call qd_dispatch() first initialize allocator used by other tests.
- qd_dispatch_t *qd = qd_dispatch(0);
+ qd_dispatch_t *qd = qd_dispatch(0, false);
qd_dispatch_validate_config(argv[1]);
if (qd_error_code()) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index 5cff90c..7794d61 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -325,7 +325,7 @@ class Qdrouterd(Process):
self.defaults()
return "".join(["%s {\n%s}\n"%(n, props(p, 1)) for n, p in self])
- def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True):
+ def __init__(self, name=None, config=Config(), pyinclude=None, wait=True, perform_teardown=True, cl_args=[]):
"""
@param name: name used for for output files, default to id from config.
@param config: router configuration
@@ -339,7 +339,7 @@ class Qdrouterd(Process):
if not default_log:
config.append(
('log', {'module':'DEFAULT', 'enable':'trace+', 'includeSource': 'true', 'outputFile':name+'.log'}))
- args = ['qdrouterd', '-c', config.write(name)]
+ args = ['qdrouterd', '-c', config.write(name)] + cl_args
env_home = os.environ.get('QPID_DISPATCH_HOME')
if pyinclude:
args += ['-I', pyinclude]
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d5bbd422/tests/system_tests_core_endpoint.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_endpoint.py b/tests/system_tests_core_endpoint.py
new file mode 100644
index 0000000..14f4fbf
--- /dev/null
+++ b/tests/system_tests_core_endpoint.py
@@ -0,0 +1,231 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest2 as unittest
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch_internal.compat import UNICODE
+from qpid_dispatch.management.client import Node
+
+
+class RouterTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(RouterTest, cls).setUpClass()
+
+ def router(name, connection, args=[]):
+
+ config = [
+ ('router', {'mode': 'interior', 'id': name}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+ connection
+ ]
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args))
+
+ cls.routers = []
+ inter_router_port = cls.tester.get_port()
+
+ router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}), ["-T"])
+
+
+ def test_01_denied_link(self):
+ test = DenyLinkTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/deny")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_02_discard_deliveries(self):
+ test = DiscardTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/discard")
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_03_presettled_source(self):
+ test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source_ps", 300, 300)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_04_unsettled_source(self):
+ test = SourceTest(self.routers[0].addresses[0], "org.apache.qpid.dispatch.router/test/source", 300, 0)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
+class DenyLinkTest(MessagingHandler):
+ def __init__(self, host, address):
+ super(DenyLinkTest, self).__init__(prefetch = 0)
+ self.host = host
+ self.address = address
+
+ self.conn = None
+ self.error = None
+ self.receiver = None
+ self.sender = None
+ self.receiver_failed = False
+ self.sender_failed = False
+
+ def timeout(self):
+ self.error = "Timeout Expired: receiver_failed=%s sender_failed=%s" %\
+ ("yes" if self.receiver_failed else "no",
+ "yes" if self.sender_failed else "no")
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+ self.conn = event.container.connect(self.host)
+ self.receiver = event.container.create_receiver(self.conn, self.address)
+ self.sender = event.container.create_sender(self.conn, self.address)
+
+ def on_link_error(self, event):
+ if event.receiver == self.receiver:
+ self.receiver_failed = True
+ if event.sender == self.sender:
+ self.sender_failed = True
+
+ if self.receiver_failed and self.sender_failed:
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class DiscardTest(MessagingHandler):
+ def __init__(self, host, address):
+ super(DiscardTest, self).__init__(prefetch = 0)
+ self.host = host
+ self.address = address
+
+ self.conn = None
+ self.error = None
+ self.sender = None
+
+ self.count = 300
+ self.sent = 0
+ self.rejected = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d n_rejected=%d" % (self.sent, self.rejected)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+ self.conn = event.container.connect(self.host)
+ self.sender = event.container.create_sender(self.conn, self.address)
+
+ def on_sendable(self, event):
+ while self.sender.credit > 0 and self.sent < self.count:
+ msg = Message(body="Discard Test")
+ self.sender.send(msg)
+ self.sent += 1
+
+ def on_rejected(self, event):
+ self.rejected += 1
+ self.conn.close()
+ self.timer.cancel()
+
+ def on_link_error(self, event):
+ if event.receiver == self.receiver:
+ self.receiver_failed = True
+ if event.sender == self.sender:
+ self.sender_failed = True
+
+ if self.receiver_failed and self.sender_failed:
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class SourceTest(MessagingHandler):
+ def __init__(self, host, address, count, expected_ps):
+ super(SourceTest, self).__init__(prefetch = 0)
+ self.host = host
+ self.address = address
+ self.expected_ps = expected_ps
+
+ self.conn = None
+ self.error = None
+ self.receiver = None
+
+ self.count = count
+ self.n_credit_given = 0
+ self.n_rcvd = 0
+ self.n_rcvd_ps = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_rcvd=%d" % (self.n_rcvd)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn = event.container.connect(self.host)
+ self.receiver = event.container.create_receiver(self.conn, self.address)
+ self.receiver.flow(3)
+ self.n_credit_given = 3
+
+ def on_message(self, event):
+ dlv = event.delivery
+ if dlv.settled:
+ self.n_rcvd_ps += 1
+ self.n_rcvd += 1
+ if self.n_rcvd == self.count:
+ self.conn.close()
+ self.timer.cancel()
+ if self.n_rcvd_ps != self.expected_ps:
+ self.error = "Received %d deliveries, %d were settled (expected %d)" %\
+ (self.n_rcvd, self.n_rcvd_ps, self.expected_ps)
+
+ elif self.n_rcvd == self.n_credit_given:
+ self.receiver.flow(5)
+ self.n_credit_given += 5
+
+ def run(self):
+ Container(self).run()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org