You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by ganeshmurthy <gi...@git.apache.org> on 2018/10/30 20:41:32 UTC
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
GitHub user ganeshmurthy opened a pull request:
https://github.com/apache/qpid-dispatch/pull/410
DISPATCH-1160 - Added edge address tracking module to interior router
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ganeshmurthy/qpid-dispatch EDGE-ADDR-TRACKING
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/qpid-dispatch/pull/410.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #410
----
commit cf5f9b7f958fa88ca747b4ec29a1cf2b0932aa33
Author: Ganesh Murthy <gm...@...>
Date: 2018-10-18T15:46:42Z
DISPATCH-1160 - Added edge address tracking module to interior router
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229782221
--- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.h ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
--- End diff --
This header file is not needed. As a module, this component offers no API.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/qpid-dispatch/pull/410
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229791449
--- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+#include "edge_addr_tracking.h"
+#include <stdio.h>
+
+
+struct qdr_addr_endpoint_state_t {
+ DEQ_LINKS(qdr_addr_endpoint_state_t);
+ qdrc_endpoint_t *endpoint;
+ qdr_connection_t *conn; // The connection associated with the endpoint.
+ qdr_addr_tracking_module_context_t *mc;
+};
+
+DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
+ALLOC_DECLARE(qdr_addr_endpoint_state_t);
+ALLOC_DEFINE(qdr_addr_endpoint_state_t);
+
+struct qdr_addr_tracking_module_context_t {
+ qdr_core_t *core;
+ qdr_addr_endpoint_state_list_t endpoint_state_list;
+ qdrc_event_subscription_t *event_sub;
+ qdrc_endpoint_desc_t addr_tracking_endpoint;
+};
+
+
+static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t *addr, bool insert_addr)
+{
+ qd_message_t *msg = qd_message();
+
+ //
+ // Start header
+ //
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_list(body);
+
+ const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+ qd_compose_insert_string(body, addr_str);
+ qd_compose_insert_bool(body, insert_addr);
+ qd_compose_end_list(body);
+
+ // Finally, compose and retuen the message so it can be sent out.
+ qd_message_compose_3(msg, fld, body);
+
+ return msg;
+}
+
+static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
+ while(endpoint_state) {
+ if (endpoint_state->conn == conn) {
+ return endpoint_state;
+ }
+ endpoint_state = DEQ_NEXT(endpoint_state);
+ }
+ return 0;
+}
+
+
+static void qdrc_address_endpoint_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+ qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
+
+ qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
+
+ ZERO(endpoint_state);
+ endpoint_state->endpoint = endpoint;
+ endpoint_state->mc = bc;
+ endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
+
+ DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+
+
+ //
+ // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
+ // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
+ //
+ if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = endpoint_state;
+ qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
+ }
+ else {
+ //
+ // We simply detach any links that dont match the above condition.
+ //
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
+ }
+}
+
+
+static void qdrc_address_endpoint_on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context;
+ qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
+ qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+}
+
+
+static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
+{
+ if (!addr)
+ return false;
+
+ bool can_send = false;
+ if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
+ // There is at least one receiver for this address somewhere in the router network
+ can_send = true;
+ }
+ if (!can_send) {
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ if (link_ref->link->conn != conn)
+ can_send=true;
+ }
+ }
+ return can_send;
+}
+
+
+static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
+{
+ if (!addr)
+ return;
+
+ if (!endpoint)
+ return;
+
+ qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
+ qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
+
+ qdrc_endpoint_send_CT(core, endpoint, dlv, true);
+}
+
+static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
+{
+ // We only care about mobile addresses.
+ if(!qdr_address_is_mobile_CT(addr))
+ return;
+
+ qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
+ switch (event) {
+ case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
+ //
+ // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+ }
+ case QDRC_EVENT_ADDR_BECAME_DEST : {
+ //
+ // This address transitioned from zero to one destination. If this address already had local destinations
+ //
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+
+ case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
+ // The address no longer has any local destinations.
+ // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the disappearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+
+ break;
+ }
+ case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
+ //
+ // This address transitioned from N destinations to one local dest
+ // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ assert(DEQ_SIZE(addr->rlinks) == 1);
+ //
+ // There should be only one rlink in the rlinks list
+ //
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ if (endpoint_state->conn == link->conn) {
+ qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+ break;
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ }
+ break;
+ case QDRC_EVENT_ADDR_TWO_DEST: {
+ // The address transisioned from one local dest to two destinations, The second destination might be local or remote.
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ if (link->conn == endpoint_state->conn) {
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
--- End diff --
Will this cause redundant messages to be sent to the edge?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788582
--- Diff: tests/system_tests_edge_router.py ---
@@ -345,5 +620,420 @@ def run(self):
Container(self).run()
-if __name__ == '__main__':
+class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, sender_host, address):
+ super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.sender = None
+
+ self.count = 300
+ self.rel_count = 50
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd_msg_bodies = dict()
+ self.dup_msg = None
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "Duplicate message %s received " % self.dup_msg
+ else:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+ (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
+
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+
+ # Create one sender
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.sender = event.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_sendable(self, event):
+ while self.n_sent < self.count:
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if self.recvd_msg_bodies.get(event.message.body):
+ self.dup_msg = event.message.body
+ self.timeout()
+ else:
+ self.recvd_msg_bodies[event.message.body] = event.message.body
+
+ if event.receiver == self.receiver1:
+ self.n_rcvd1 += 1
+ if event.receiver == self.receiver2:
+ self.n_rcvd2 += 1
+
+ if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
+ self.all_msgs_received = True
+
+ def on_settled(self, event):
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.receiver1.close()
+ self.receiver2.close()
+ for i in range(self.rel_count):
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == self.rel_count and self.all_msgs_received:
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class MobileAddressMulticastTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, address, large_msg=False, check_addr=False):
+ super(MobileAddressMulticastTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.receiver3_host = receiver3_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.receiver3_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.receiver3 = None
+ self.sender = None
+
+ self.count = 200
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_rcvd3 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd1_msgs = dict()
+ self.recvd2_msgs = dict()
+ self.recvd3_msgs = dict()
+ self.dup_msg_rcvd = False
+ self.dup_msg = None
+ self.receiver_name = None
+ self.large_msg = large_msg
+ self.body = ""
+ self.r_attaches = 0
+ self.addr_timer = None
+ self.num_attempts = 0
+ self.container = None
+ self.check_addr = check_addr
+
+ if self.large_msg:
+ for i in range(10000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "%s received duplicate message %s" % \
+ (self.receiver_name, self.dup_msg)
+ else:
+ if not self.error:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+ "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+ self.n_rcvd3, self.address)
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ if self.sender_conn:
+ self.sender_conn.close()
+
+ def check_address(self):
+ local_node = Node.connect(self.sender_host, timeout=TIMEOUT)
--- End diff --
Is Node a synchronous client? If so, I don't think it should be used in a reactive environment.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch issue #410: DISPATCH-1160 - Added edge address tracking module...
Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:
https://github.com/apache/qpid-dispatch/pull/410
# [Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=h1) Report
> Merging [#410](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=desc) into [master](https://codecov.io/gh/apache/qpid-dispatch/commit/413d727b6e1772d87a23f88261cdced4940d33ee?src=pr&el=desc) will **increase** coverage by `0.27%`.
> The diff coverage is `89.5%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/qpid-dispatch/pull/410/graphs/tree.svg?width=650&token=rk2Cgd27pP&height=150&src=pr)](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #410 +/- ##
==========================================
+ Coverage 85.16% 85.44% +0.27%
==========================================
Files 78 79 +1
Lines 17429 17591 +162
==========================================
+ Hits 14843 15030 +187
+ Misses 2586 2561 -25
```
| [Impacted Files](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [src/router\_core/router\_core.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL3JvdXRlcl9jb3JlLmM=) | `93.27% <85.71%> (-0.16%)` | :arrow_down: |
| [...re/modules/edge\_addr\_tracking/edge\_addr\_tracking.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL21vZHVsZXMvZWRnZV9hZGRyX3RyYWNraW5nL2VkZ2VfYWRkcl90cmFja2luZy5j) | `89.67% <89.67%> (ø)` | |
| [src/router\_node.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9ub2RlLmM=) | `93.78% <0%> (+0.12%)` | :arrow_up: |
| [src/router\_core/connections.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2Nvbm5lY3Rpb25zLmM=) | `95.48% <0%> (+0.21%)` | :arrow_up: |
| [src/router\_core/transfer.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL3RyYW5zZmVyLmM=) | `90.31% <0%> (+0.44%)` | :arrow_up: |
| [src/router\_core/agent\_link.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2FnZW50X2xpbmsuYw==) | `63.84% <0%> (+0.56%)` | :arrow_up: |
| [src/router\_core/core\_timer.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2NvcmVfdGltZXIuYw==) | `96.42% <0%> (+1.78%)` | :arrow_up: |
| [src/router\_core/core\_events.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2NvcmVfZXZlbnRzLmM=) | `100% <0%> (+11.36%)` | :arrow_up: |
| [src/router\_core/modules/edge\_router/addr\_proxy.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL21vZHVsZXMvZWRnZV9yb3V0ZXIvYWRkcl9wcm94eS5j) | `86.61% <0%> (+20.42%)` | :arrow_up: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=footer). Last update [413d727...911abcb](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229790940
--- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+#include "edge_addr_tracking.h"
+#include <stdio.h>
+
+
+struct qdr_addr_endpoint_state_t {
+ DEQ_LINKS(qdr_addr_endpoint_state_t);
+ qdrc_endpoint_t *endpoint;
+ qdr_connection_t *conn; // The connection associated with the endpoint.
+ qdr_addr_tracking_module_context_t *mc;
+};
+
+DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
+ALLOC_DECLARE(qdr_addr_endpoint_state_t);
+ALLOC_DEFINE(qdr_addr_endpoint_state_t);
+
+struct qdr_addr_tracking_module_context_t {
+ qdr_core_t *core;
+ qdr_addr_endpoint_state_list_t endpoint_state_list;
+ qdrc_event_subscription_t *event_sub;
+ qdrc_endpoint_desc_t addr_tracking_endpoint;
+};
+
+
+static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t *addr, bool insert_addr)
+{
+ qd_message_t *msg = qd_message();
+
+ //
+ // Start header
+ //
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_list(body);
+
+ const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
+
+ qd_compose_insert_string(body, addr_str);
+ qd_compose_insert_bool(body, insert_addr);
+ qd_compose_end_list(body);
+
+ // Finally, compose and retuen the message so it can be sent out.
+ qd_message_compose_3(msg, fld, body);
+
+ return msg;
+}
+
+static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
+ while(endpoint_state) {
+ if (endpoint_state->conn == conn) {
+ return endpoint_state;
+ }
+ endpoint_state = DEQ_NEXT(endpoint_state);
+ }
+ return 0;
+}
+
+
+static void qdrc_address_endpoint_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+ qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
+
+ qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
+
+ ZERO(endpoint_state);
+ endpoint_state->endpoint = endpoint;
+ endpoint_state->mc = bc;
+ endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
+
+ DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+
+
+ //
+ // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
+ // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
+ //
+ if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = endpoint_state;
+ qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
+ }
+ else {
+ //
+ // We simply detach any links that dont match the above condition.
+ //
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
+ }
+}
+
+
+static void qdrc_address_endpoint_on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context;
+ qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
+ qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+}
+
+
+static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
+{
+ if (!addr)
+ return false;
+
+ bool can_send = false;
+ if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
+ // There is at least one receiver for this address somewhere in the router network
+ can_send = true;
+ }
+ if (!can_send) {
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ if (link_ref->link->conn != conn)
+ can_send=true;
+ }
+ }
+ return can_send;
+}
+
+
+static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
+{
+ if (!addr)
+ return;
+
+ if (!endpoint)
+ return;
+
+ qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
+ qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
+
+ qdrc_endpoint_send_CT(core, endpoint, dlv, true);
+}
+
+static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
+{
+ // We only care about mobile addresses.
+ if(!qdr_address_is_mobile_CT(addr))
+ return;
+
+ qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
+ switch (event) {
+ case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
+ //
+ // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+ }
+ case QDRC_EVENT_ADDR_BECAME_DEST : {
+ //
+ // This address transitioned from zero to one destination. If this address already had local destinations
+ //
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+
+ case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
+ // The address no longer has any local destinations.
+ // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of the disappearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr, endpoint, false);
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+
+ break;
+ }
+ case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
+ //
+ // This address transitioned from N destinations to one local dest
+ // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
--- End diff --
This is an unnecessary check. The cardinality of rnodes is, by definition, zero.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org
[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788028
--- Diff: tests/system_tests_edge_router.py ---
@@ -345,5 +620,420 @@ def run(self):
Container(self).run()
-if __name__ == '__main__':
+class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, sender_host, address):
+ super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.sender = None
+
+ self.count = 300
+ self.rel_count = 50
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd_msg_bodies = dict()
+ self.dup_msg = None
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "Duplicate message %s received " % self.dup_msg
+ else:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
+ (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
+
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+
+ # Create one sender
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.sender = event.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_sendable(self, event):
+ while self.n_sent < self.count:
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_message(self, event):
+ if self.recvd_msg_bodies.get(event.message.body):
+ self.dup_msg = event.message.body
+ self.timeout()
+ else:
+ self.recvd_msg_bodies[event.message.body] = event.message.body
+
+ if event.receiver == self.receiver1:
+ self.n_rcvd1 += 1
+ if event.receiver == self.receiver2:
+ self.n_rcvd2 += 1
+
+ if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
+ self.all_msgs_received = True
+
+ def on_settled(self, event):
+ self.n_settled += 1
+ if self.n_settled == self.count:
+ self.receiver1.close()
+ self.receiver2.close()
+ for i in range(self.rel_count):
+ self.sender.send(Message(body="Message %d" % self.n_sent))
+ self.n_sent += 1
+
+ def on_released(self, event):
+ self.n_released += 1
+ if self.n_released == self.rel_count and self.all_msgs_received:
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.sender_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class MobileAddressMulticastTest(MessagingHandler):
+ def __init__(self, receiver1_host, receiver2_host, receiver3_host,
+ sender_host, address, large_msg=False, check_addr=False):
+ super(MobileAddressMulticastTest, self).__init__()
+ self.receiver1_host = receiver1_host
+ self.receiver2_host = receiver2_host
+ self.receiver3_host = receiver3_host
+ self.sender_host = sender_host
+ self.address = address
+
+ # One sender connection and two receiver connections
+ self.receiver1_conn = None
+ self.receiver2_conn = None
+ self.receiver3_conn = None
+ self.sender_conn = None
+
+ self.receiver1 = None
+ self.receiver2 = None
+ self.receiver3 = None
+ self.sender = None
+
+ self.count = 200
+ self.n_rcvd1 = 0
+ self.n_rcvd2 = 0
+ self.n_rcvd3 = 0
+ self.n_sent = 0
+ self.n_settled = 0
+ self.n_released = 0
+ self.error = None
+ self.timer = None
+ self.all_msgs_received = False
+ self.recvd1_msgs = dict()
+ self.recvd2_msgs = dict()
+ self.recvd3_msgs = dict()
+ self.dup_msg_rcvd = False
+ self.dup_msg = None
+ self.receiver_name = None
+ self.large_msg = large_msg
+ self.body = ""
+ self.r_attaches = 0
+ self.addr_timer = None
+ self.num_attempts = 0
+ self.container = None
+ self.check_addr = check_addr
+
+ if self.large_msg:
+ for i in range(10000):
+ self.body += "0123456789101112131415"
+
+ def timeout(self):
+ if self.dup_msg:
+ self.error = "%s received duplicate message %s" % \
+ (self.receiver_name, self.dup_msg)
+ else:
+ if not self.error:
+ self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
+ "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
+ (self.n_sent, self.n_rcvd1, self.n_rcvd2,
+ self.n_rcvd3, self.address)
+ self.receiver1_conn.close()
+ self.receiver2_conn.close()
+ self.receiver3_conn.close()
+ if self.sender_conn:
+ self.sender_conn.close()
+
+ def check_address(self):
+ local_node = Node.connect(self.sender_host, timeout=TIMEOUT)
+ outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
+ found = False
+ for result in outs.results:
+ if self.address in result[0]:
+ found = True
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.address)
+ local_node.close()
+ break
+
+ if not found:
+ self.error = "Unable to create sender because of " \
+ "absence of address in the address table"
+ self.addr_timer.cancel()
+ self.timeout()
+ local_node.close()
+
+ def create_sndr(self):
+ self.sender_conn = self.container.connect(self.sender_host)
+ self.sender = self.container.create_sender(self.sender_conn,
+ self.address)
+
+ def on_start(self, event):
+ if self.large_msg:
+ self.timer = event.reactor.schedule(10.0, Timeout(self))
+ else:
+ self.timer = event.reactor.schedule(20.0, Timeout(self))
+
+ # Create two receivers
+ self.receiver1_conn = event.container.connect(self.receiver1_host)
+ self.receiver2_conn = event.container.connect(self.receiver2_host)
+ self.receiver3_conn = event.container.connect(self.receiver3_host)
+ self.receiver1 = event.container.create_receiver(self.receiver1_conn,
+ self.address)
+ self.receiver2 = event.container.create_receiver(self.receiver2_conn,
+ self.address)
+ self.receiver3 = event.container.create_receiver(self.receiver3_conn,
+ self.address)
+ self.container = event.container
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver1 or \
+ event.receiver == self.receiver2 or \
+ event.receiver == self.receiver3:
+ self.r_attaches += 1
+ if self.r_attaches == 3:
+ self.addr_timer = event.reactor.schedule(4.0,
--- End diff --
Why do you wait so long? These tests take a very long time to run.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org