You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by ganeshmurthy <gi...@git.apache.org> on 2018/10/30 20:41:32 UTC

[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

GitHub user ganeshmurthy opened a pull request:

    https://github.com/apache/qpid-dispatch/pull/410

    DISPATCH-1160 - Added edge address tracking module to interior router

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ganeshmurthy/qpid-dispatch EDGE-ADDR-TRACKING

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/qpid-dispatch/pull/410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #410
    
----
commit cf5f9b7f958fa88ca747b4ec29a1cf2b0932aa33
Author: Ganesh Murthy <gm...@...>
Date:   2018-10-18T15:46:42Z

    DISPATCH-1160 - Added edge address tracking module to interior router

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229782221
  
    --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.h ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    +*/
    --- End diff --
    
    This header file is not needed.  As a module, this component offers no API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/qpid-dispatch/pull/410


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229791449
  
    --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +#include <qpid/dispatch/ctools.h>
    +#include <qpid/dispatch/amqp.h>
    +#include "module.h"
    +#include "core_link_endpoint.h"
    +#include "edge_addr_tracking.h"
    +#include <stdio.h>
    +
    +
    +struct qdr_addr_endpoint_state_t {
    +    DEQ_LINKS(qdr_addr_endpoint_state_t);
    +    qdrc_endpoint_t                    *endpoint;
    +    qdr_connection_t                   *conn;    // The connection associated with the endpoint.
    +    qdr_addr_tracking_module_context_t *mc;
    +};
    +
    +DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
    +ALLOC_DECLARE(qdr_addr_endpoint_state_t);
    +ALLOC_DEFINE(qdr_addr_endpoint_state_t);
    +
    +struct  qdr_addr_tracking_module_context_t {
    +    qdr_core_t                     *core;
    +    qdr_addr_endpoint_state_list_t  endpoint_state_list;
    +    qdrc_event_subscription_t      *event_sub;
    +    qdrc_endpoint_desc_t           addr_tracking_endpoint;
    +};
    +
    +
    +static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t   *addr, bool insert_addr)
    +{
    +    qd_message_t *msg = qd_message();
    +
    +    //
    +    // Start header
    +    //
    +    qd_composed_field_t *fld   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
    +    qd_compose_start_list(fld);
    +    qd_compose_insert_bool(fld, 0);     // durable
    +    qd_compose_end_list(fld);
    +
    +    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
    +
    +    qd_compose_start_list(body);
    +
    +    const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
    +
    +    qd_compose_insert_string(body, addr_str);
    +    qd_compose_insert_bool(body, insert_addr);
    +    qd_compose_end_list(body);
    +
    +    // Finally, compose and retuen the message so it can be sent out.
    +    qd_message_compose_3(msg, fld, body);
    +
    +    return msg;
    +}
    +
    +static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t  endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
    +    while(endpoint_state) {
    +        if (endpoint_state->conn == conn) {
    +            return endpoint_state;
    +        }
    +        endpoint_state = DEQ_NEXT(endpoint_state);
    +    }
    +    return 0;
    +}
    +
    +
    +static void qdrc_address_endpoint_first_attach(void              *bind_context,
    +                                               qdrc_endpoint_t   *endpoint,
    +                                               void             **link_context,
    +                                               qdr_terminus_t   *remote_source,
    +                                               qdr_terminus_t   *remote_target)
    +{
    +    qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
    +
    +    qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
    +
    +    ZERO(endpoint_state);
    +    endpoint_state->endpoint = endpoint;
    +    endpoint_state->mc       = bc;
    +    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
    +
    +    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
    +
    +
    +    //
    +    // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
    +    // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
    +    //
    +    if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
    +        *link_context = endpoint_state;
    +        qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
    +   }
    +    else {
    +        //
    +        // We simply detach any links that dont match the above condition.
    +        //
    +        *link_context = 0;
    +        qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
    +    }
    +}
    +
    +
    +static void qdrc_address_endpoint_on_first_detach(void *link_context,
    +                                              qdr_error_t *error)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
    +    qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
    +    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
    +    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
    +    free_qdr_addr_endpoint_state_t(endpoint_state);
    +}
    +
    +
    +static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
    +{
    +    if (!addr)
    +        return false;
    +
    +    bool can_send = false;
    +    if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
    +        // There is at least one receiver for this address somewhere in the router network
    +        can_send = true;
    +    }
    +    if (!can_send) {
    +        if (DEQ_SIZE(addr->rlinks) == 1) {
    +            qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
    +            if (link_ref->link->conn != conn)
    +                can_send=true;
    +        }
    +    }
    +    return can_send;
    +}
    +
    +
    +static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
    +{
    +    if (!addr)
    +        return;
    +
    +    if (!endpoint)
    +        return;
    +
    +    qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
    +    qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
    +
    +    qdrc_endpoint_send_CT(core, endpoint, dlv, true);
    +}
    +
    +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
    +{
    +    // We only care about mobile addresses.
    +    if(!qdr_address_is_mobile_CT(addr))
    +        return;
    +
    +    qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
    +    switch (event) {
    +        case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
    +            //
    +            // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the appearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                            qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                            qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_BECAME_DEST : {
    +            //
    +            // This address transitioned from zero to one destination. If this address already had local destinations
    +            //
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            //
    +            // Every inlink that has an edge context must be informed of the appearence of this address.
    +            //
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                    if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    +                    }
    +                }
    +                inlink = DEQ_NEXT(inlink);
    +            }
    +        }
    +        break;
    +
    +        case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
    +            // The address no longer has any local destinations.
    +            // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the disappearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, false);
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
    +            //
    +            // This address transitioned from N destinations to one local dest
    +            // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                assert(DEQ_SIZE(addr->rlinks) == 1);
    +                //
    +                // There should be only one rlink in the rlinks list
    +                //
    +                qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
    +                qdr_link_t *link = rlink_ref->link;
    +
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        if (endpoint_state->conn == link->conn) {
    +                            qdrc_send_message(addr_tracking->core, addr, endpoint, false);
    +                            break;
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +        }
    +        break;
    +        case QDRC_EVENT_ADDR_TWO_DEST: {
    +            // The address transisioned from one local dest to two destinations, The second destination might be local or remote.
    +            qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
    +            qdr_link_t *link = rlink_ref->link;
    +
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                    qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                    if (link->conn == endpoint_state->conn) {
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    --- End diff --
    
    Will this cause redundant messages to be sent to the edge?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788582
  
    --- Diff: tests/system_tests_edge_router.py ---
    @@ -345,5 +620,420 @@ def run(self):
             Container(self).run()
     
     
    -if __name__ == '__main__':
    +class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
    +    def __init__(self, receiver1_host, receiver2_host, sender_host, address):
    +        super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
    +        self.receiver1_host = receiver1_host
    +        self.receiver2_host = receiver2_host
    +        self.sender_host = sender_host
    +        self.address = address
    +
    +        # One sender connection and two receiver connections
    +        self.receiver1_conn = None
    +        self.receiver2_conn = None
    +        self.sender_conn   = None
    +
    +        self.receiver1 = None
    +        self.receiver2 = None
    +        self.sender = None
    +
    +        self.count = 300
    +        self.rel_count = 50
    +        self.n_rcvd1 = 0
    +        self.n_rcvd2 = 0
    +        self.n_sent = 0
    +        self.n_settled = 0
    +        self.n_released = 0
    +        self.error = None
    +        self.timer = None
    +        self.all_msgs_received = False
    +        self.recvd_msg_bodies = dict()
    +        self.dup_msg = None
    +
    +    def timeout(self):
    +        if self.dup_msg:
    +            self.error = "Duplicate message %s received " % self.dup_msg
    +        else:
    +            self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
    +                         (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
    +
    +        self.receiver1_conn.close()
    +        self.receiver2_conn.close()
    +        self.sender_conn.close()
    +
    +    def on_start(self, event):
    +        self.timer = event.reactor.schedule(5.0, Timeout(self))
    +
    +        # Create two receivers
    +        self.receiver1_conn = event.container.connect(self.receiver1_host)
    +        self.receiver2_conn = event.container.connect(self.receiver2_host)
    +        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
    +                                                         self.address)
    +        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
    +                                                         self.address)
    +
    +        # Create one sender
    +        self.sender_conn = event.container.connect(self.sender_host)
    +        self.sender = event.container.create_sender(self.sender_conn,
    +                                                    self.address)
    +
    +    def on_sendable(self, event):
    +        while self.n_sent < self.count:
    +            self.sender.send(Message(body="Message %d" % self.n_sent))
    +            self.n_sent += 1
    +
    +    def on_message(self, event):
    +        if self.recvd_msg_bodies.get(event.message.body):
    +            self.dup_msg = event.message.body
    +            self.timeout()
    +        else:
    +            self.recvd_msg_bodies[event.message.body] = event.message.body
    +
    +        if event.receiver == self.receiver1:
    +            self.n_rcvd1 += 1
    +        if event.receiver == self.receiver2:
    +            self.n_rcvd2 += 1
    +
    +        if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
    +            self.all_msgs_received = True
    +
    +    def on_settled(self, event):
    +        self.n_settled += 1
    +        if self.n_settled == self.count:
    +            self.receiver1.close()
    +            self.receiver2.close()
    +            for i in range(self.rel_count):
    +                self.sender.send(Message(body="Message %d" % self.n_sent))
    +                self.n_sent += 1
    +
    +    def on_released(self, event):
    +        self.n_released += 1
    +        if self.n_released == self.rel_count and self.all_msgs_received:
    +            self.receiver1_conn.close()
    +            self.receiver2_conn.close()
    +            self.sender_conn.close()
    +            self.timer.cancel()
    +
    +    def run(self):
    +        Container(self).run()
    +
    +
    +class MobileAddressMulticastTest(MessagingHandler):
    +    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
    +                 sender_host, address, large_msg=False, check_addr=False):
    +        super(MobileAddressMulticastTest, self).__init__()
    +        self.receiver1_host = receiver1_host
    +        self.receiver2_host = receiver2_host
    +        self.receiver3_host = receiver3_host
    +        self.sender_host = sender_host
    +        self.address = address
    +
    +        # One sender connection and two receiver connections
    +        self.receiver1_conn = None
    +        self.receiver2_conn = None
    +        self.receiver3_conn = None
    +        self.sender_conn = None
    +
    +        self.receiver1 = None
    +        self.receiver2 = None
    +        self.receiver3 = None
    +        self.sender = None
    +
    +        self.count = 200
    +        self.n_rcvd1 = 0
    +        self.n_rcvd2 = 0
    +        self.n_rcvd3 = 0
    +        self.n_sent = 0
    +        self.n_settled = 0
    +        self.n_released = 0
    +        self.error = None
    +        self.timer = None
    +        self.all_msgs_received = False
    +        self.recvd1_msgs = dict()
    +        self.recvd2_msgs = dict()
    +        self.recvd3_msgs = dict()
    +        self.dup_msg_rcvd = False
    +        self.dup_msg = None
    +        self.receiver_name = None
    +        self.large_msg = large_msg
    +        self.body = ""
    +        self.r_attaches = 0
    +        self.addr_timer = None
    +        self.num_attempts = 0
    +        self.container = None
    +        self.check_addr = check_addr
    +
    +        if self.large_msg:
    +            for i in range(10000):
    +                self.body += "0123456789101112131415"
    +
    +    def timeout(self):
    +        if self.dup_msg:
    +            self.error = "%s received  duplicate message %s" % \
    +                         (self.receiver_name, self.dup_msg)
    +        else:
    +            if not self.error:
    +                self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
    +                             "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
    +                             (self.n_sent, self.n_rcvd1, self.n_rcvd2,
    +                              self.n_rcvd3, self.address)
    +        self.receiver1_conn.close()
    +        self.receiver2_conn.close()
    +        self.receiver3_conn.close()
    +        if self.sender_conn:
    +            self.sender_conn.close()
    +
    +    def check_address(self):
    +        local_node = Node.connect(self.sender_host, timeout=TIMEOUT)
    --- End diff --
    
    Is Node a synchronous client?  If so, I don't think it should be used in a reactive environment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch issue #410: DISPATCH-1160 - Added edge address tracking module...

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/qpid-dispatch/pull/410
  
    # [Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=h1) Report
    > Merging [#410](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=desc) into [master](https://codecov.io/gh/apache/qpid-dispatch/commit/413d727b6e1772d87a23f88261cdced4940d33ee?src=pr&el=desc) will **increase** coverage by `0.27%`.
    > The diff coverage is `89.5%`.
    
    [![Impacted file tree graph](https://codecov.io/gh/apache/qpid-dispatch/pull/410/graphs/tree.svg?width=650&token=rk2Cgd27pP&height=150&src=pr)](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=tree)
    
    ```diff
    @@            Coverage Diff             @@
    ##           master     #410      +/-   ##
    ==========================================
    + Coverage   85.16%   85.44%   +0.27%     
    ==========================================
      Files          78       79       +1     
      Lines       17429    17591     +162     
    ==========================================
    + Hits        14843    15030     +187     
    + Misses       2586     2561      -25
    ```
    
    
    | [Impacted Files](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=tree) | Coverage Δ | |
    |---|---|---|
    | [src/router\_core/router\_core.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL3JvdXRlcl9jb3JlLmM=) | `93.27% <85.71%> (-0.16%)` | :arrow_down: |
    | [...re/modules/edge\_addr\_tracking/edge\_addr\_tracking.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL21vZHVsZXMvZWRnZV9hZGRyX3RyYWNraW5nL2VkZ2VfYWRkcl90cmFja2luZy5j) | `89.67% <89.67%> (ø)` | |
    | [src/router\_node.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9ub2RlLmM=) | `93.78% <0%> (+0.12%)` | :arrow_up: |
    | [src/router\_core/connections.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2Nvbm5lY3Rpb25zLmM=) | `95.48% <0%> (+0.21%)` | :arrow_up: |
    | [src/router\_core/transfer.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL3RyYW5zZmVyLmM=) | `90.31% <0%> (+0.44%)` | :arrow_up: |
    | [src/router\_core/agent\_link.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2FnZW50X2xpbmsuYw==) | `63.84% <0%> (+0.56%)` | :arrow_up: |
    | [src/router\_core/core\_timer.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2NvcmVfdGltZXIuYw==) | `96.42% <0%> (+1.78%)` | :arrow_up: |
    | [src/router\_core/core\_events.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL2NvcmVfZXZlbnRzLmM=) | `100% <0%> (+11.36%)` | :arrow_up: |
    | [src/router\_core/modules/edge\_router/addr\_proxy.c](https://codecov.io/gh/apache/qpid-dispatch/pull/410/diff?src=pr&el=tree#diff-c3JjL3JvdXRlcl9jb3JlL21vZHVsZXMvZWRnZV9yb3V0ZXIvYWRkcl9wcm94eS5j) | `86.61% <0%> (+20.42%)` | :arrow_up: |
    
    ------
    
    [Continue to review full report at Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=continue).
    > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
    > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
    > Powered by [Codecov](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=footer). Last update [413d727...911abcb](https://codecov.io/gh/apache/qpid-dispatch/pull/410?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).



---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229790940
  
    --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +#include <qpid/dispatch/ctools.h>
    +#include <qpid/dispatch/amqp.h>
    +#include "module.h"
    +#include "core_link_endpoint.h"
    +#include "edge_addr_tracking.h"
    +#include <stdio.h>
    +
    +
    +struct qdr_addr_endpoint_state_t {
    +    DEQ_LINKS(qdr_addr_endpoint_state_t);
    +    qdrc_endpoint_t                    *endpoint;
    +    qdr_connection_t                   *conn;    // The connection associated with the endpoint.
    +    qdr_addr_tracking_module_context_t *mc;
    +};
    +
    +DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
    +ALLOC_DECLARE(qdr_addr_endpoint_state_t);
    +ALLOC_DEFINE(qdr_addr_endpoint_state_t);
    +
    +struct  qdr_addr_tracking_module_context_t {
    +    qdr_core_t                     *core;
    +    qdr_addr_endpoint_state_list_t  endpoint_state_list;
    +    qdrc_event_subscription_t      *event_sub;
    +    qdrc_endpoint_desc_t           addr_tracking_endpoint;
    +};
    +
    +
    +static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t   *addr, bool insert_addr)
    +{
    +    qd_message_t *msg = qd_message();
    +
    +    //
    +    // Start header
    +    //
    +    qd_composed_field_t *fld   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
    +    qd_compose_start_list(fld);
    +    qd_compose_insert_bool(fld, 0);     // durable
    +    qd_compose_end_list(fld);
    +
    +    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
    +
    +    qd_compose_start_list(body);
    +
    +    const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
    +
    +    qd_compose_insert_string(body, addr_str);
    +    qd_compose_insert_bool(body, insert_addr);
    +    qd_compose_end_list(body);
    +
    +    // Finally, compose and retuen the message so it can be sent out.
    +    qd_message_compose_3(msg, fld, body);
    +
    +    return msg;
    +}
    +
    +static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t  endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
    +    while(endpoint_state) {
    +        if (endpoint_state->conn == conn) {
    +            return endpoint_state;
    +        }
    +        endpoint_state = DEQ_NEXT(endpoint_state);
    +    }
    +    return 0;
    +}
    +
    +
    +static void qdrc_address_endpoint_first_attach(void              *bind_context,
    +                                               qdrc_endpoint_t   *endpoint,
    +                                               void             **link_context,
    +                                               qdr_terminus_t   *remote_source,
    +                                               qdr_terminus_t   *remote_target)
    +{
    +    qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
    +
    +    qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
    +
    +    ZERO(endpoint_state);
    +    endpoint_state->endpoint = endpoint;
    +    endpoint_state->mc       = bc;
    +    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
    +
    +    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
    +
    +
    +    //
    +    // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link
    +    // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
    +    //
    +    if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
    +        *link_context = endpoint_state;
    +        qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
    +   }
    +    else {
    +        //
    +        // We simply detach any links that dont match the above condition.
    +        //
    +        *link_context = 0;
    +        qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
    +    }
    +}
    +
    +
    +static void qdrc_address_endpoint_on_first_detach(void *link_context,
    +                                              qdr_error_t *error)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
    +    qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0);
    +    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
    +    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
    +    free_qdr_addr_endpoint_state_t(endpoint_state);
    +}
    +
    +
    +static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
    +{
    +    if (!addr)
    +        return false;
    +
    +    bool can_send = false;
    +    if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) {
    +        // There is at least one receiver for this address somewhere in the router network
    +        can_send = true;
    +    }
    +    if (!can_send) {
    +        if (DEQ_SIZE(addr->rlinks) == 1) {
    +            qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
    +            if (link_ref->link->conn != conn)
    +                can_send=true;
    +        }
    +    }
    +    return can_send;
    +}
    +
    +
    +static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr)
    +{
    +    if (!addr)
    +        return;
    +
    +    if (!endpoint)
    +        return;
    +
    +    qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
    +    qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
    +
    +    qdrc_endpoint_send_CT(core, endpoint, dlv, true);
    +}
    +
    +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
    +{
    +    // We only care about mobile addresses.
    +    if(!qdr_address_is_mobile_CT(addr))
    +        return;
    +
    +    qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context;
    +    switch (event) {
    +        case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
    +            //
    +            // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the appearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                            qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                            qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_BECAME_DEST : {
    +            //
    +            // This address transitioned from zero to one destination. If this address already had local destinations
    +            //
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            //
    +            // Every inlink that has an edge context must be informed of the appearence of this address.
    +            //
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                    if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    +                    }
    +                }
    +                inlink = DEQ_NEXT(inlink);
    +            }
    +        }
    +        break;
    +
    +        case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
    +            // The address no longer has any local destinations.
    +            // If there are no remote destinations either, we have to tell the edge routers to delete their sender links
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the disappearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, false);
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
    +            //
    +            // This address transitioned from N destinations to one local dest
    +            // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    --- End diff --
    
    This is an unnecessary check.  The cardinality of rnodes is, by definition, zero.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


[GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...

Posted by ted-ross <gi...@git.apache.org>.
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229788028
  
    --- Diff: tests/system_tests_edge_router.py ---
    @@ -345,5 +620,420 @@ def run(self):
             Container(self).run()
     
     
    -if __name__ == '__main__':
    +class MobileAddressOneSenderTwoReceiversTest(MessagingHandler):
    +    def __init__(self, receiver1_host, receiver2_host, sender_host, address):
    +        super(MobileAddressOneSenderTwoReceiversTest, self).__init__()
    +        self.receiver1_host = receiver1_host
    +        self.receiver2_host = receiver2_host
    +        self.sender_host = sender_host
    +        self.address = address
    +
    +        # One sender connection and two receiver connections
    +        self.receiver1_conn = None
    +        self.receiver2_conn = None
    +        self.sender_conn   = None
    +
    +        self.receiver1 = None
    +        self.receiver2 = None
    +        self.sender = None
    +
    +        self.count = 300
    +        self.rel_count = 50
    +        self.n_rcvd1 = 0
    +        self.n_rcvd2 = 0
    +        self.n_sent = 0
    +        self.n_settled = 0
    +        self.n_released = 0
    +        self.error = None
    +        self.timer = None
    +        self.all_msgs_received = False
    +        self.recvd_msg_bodies = dict()
    +        self.dup_msg = None
    +
    +    def timeout(self):
    +        if self.dup_msg:
    +            self.error = "Duplicate message %s received " % self.dup_msg
    +        else:
    +            self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \
    +                         (self.n_sent, (self.n_rcvd1 + self.n_rcvd2), self.n_settled, self.n_released, self.address)
    +
    +        self.receiver1_conn.close()
    +        self.receiver2_conn.close()
    +        self.sender_conn.close()
    +
    +    def on_start(self, event):
    +        self.timer = event.reactor.schedule(5.0, Timeout(self))
    +
    +        # Create two receivers
    +        self.receiver1_conn = event.container.connect(self.receiver1_host)
    +        self.receiver2_conn = event.container.connect(self.receiver2_host)
    +        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
    +                                                         self.address)
    +        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
    +                                                         self.address)
    +
    +        # Create one sender
    +        self.sender_conn = event.container.connect(self.sender_host)
    +        self.sender = event.container.create_sender(self.sender_conn,
    +                                                    self.address)
    +
    +    def on_sendable(self, event):
    +        while self.n_sent < self.count:
    +            self.sender.send(Message(body="Message %d" % self.n_sent))
    +            self.n_sent += 1
    +
    +    def on_message(self, event):
    +        if self.recvd_msg_bodies.get(event.message.body):
    +            self.dup_msg = event.message.body
    +            self.timeout()
    +        else:
    +            self.recvd_msg_bodies[event.message.body] = event.message.body
    +
    +        if event.receiver == self.receiver1:
    +            self.n_rcvd1 += 1
    +        if event.receiver == self.receiver2:
    +            self.n_rcvd2 += 1
    +
    +        if self.n_sent == self.n_rcvd1 + self.n_rcvd2:
    +            self.all_msgs_received = True
    +
    +    def on_settled(self, event):
    +        self.n_settled += 1
    +        if self.n_settled == self.count:
    +            self.receiver1.close()
    +            self.receiver2.close()
    +            for i in range(self.rel_count):
    +                self.sender.send(Message(body="Message %d" % self.n_sent))
    +                self.n_sent += 1
    +
    +    def on_released(self, event):
    +        self.n_released += 1
    +        if self.n_released == self.rel_count and self.all_msgs_received:
    +            self.receiver1_conn.close()
    +            self.receiver2_conn.close()
    +            self.sender_conn.close()
    +            self.timer.cancel()
    +
    +    def run(self):
    +        Container(self).run()
    +
    +
    +class MobileAddressMulticastTest(MessagingHandler):
    +    def __init__(self, receiver1_host, receiver2_host, receiver3_host,
    +                 sender_host, address, large_msg=False, check_addr=False):
    +        super(MobileAddressMulticastTest, self).__init__()
    +        self.receiver1_host = receiver1_host
    +        self.receiver2_host = receiver2_host
    +        self.receiver3_host = receiver3_host
    +        self.sender_host = sender_host
    +        self.address = address
    +
    +        # One sender connection and two receiver connections
    +        self.receiver1_conn = None
    +        self.receiver2_conn = None
    +        self.receiver3_conn = None
    +        self.sender_conn = None
    +
    +        self.receiver1 = None
    +        self.receiver2 = None
    +        self.receiver3 = None
    +        self.sender = None
    +
    +        self.count = 200
    +        self.n_rcvd1 = 0
    +        self.n_rcvd2 = 0
    +        self.n_rcvd3 = 0
    +        self.n_sent = 0
    +        self.n_settled = 0
    +        self.n_released = 0
    +        self.error = None
    +        self.timer = None
    +        self.all_msgs_received = False
    +        self.recvd1_msgs = dict()
    +        self.recvd2_msgs = dict()
    +        self.recvd3_msgs = dict()
    +        self.dup_msg_rcvd = False
    +        self.dup_msg = None
    +        self.receiver_name = None
    +        self.large_msg = large_msg
    +        self.body = ""
    +        self.r_attaches = 0
    +        self.addr_timer = None
    +        self.num_attempts = 0
    +        self.container = None
    +        self.check_addr = check_addr
    +
    +        if self.large_msg:
    +            for i in range(10000):
    +                self.body += "0123456789101112131415"
    +
    +    def timeout(self):
    +        if self.dup_msg:
    +            self.error = "%s received  duplicate message %s" % \
    +                         (self.receiver_name, self.dup_msg)
    +        else:
    +            if not self.error:
    +                self.error = "Timeout Expired - n_sent=%d n_rcvd1=%d " \
    +                             "n_rcvd2=%d n_rcvd3=%d addr=%s" % \
    +                             (self.n_sent, self.n_rcvd1, self.n_rcvd2,
    +                              self.n_rcvd3, self.address)
    +        self.receiver1_conn.close()
    +        self.receiver2_conn.close()
    +        self.receiver3_conn.close()
    +        if self.sender_conn:
    +            self.sender_conn.close()
    +
    +    def check_address(self):
    +        local_node = Node.connect(self.sender_host, timeout=TIMEOUT)
    +        outs = local_node.query(type='org.apache.qpid.dispatch.router.address')
    +        found = False
    +        for result in outs.results:
    +            if self.address in result[0]:
    +                found = True
    +                self.sender_conn = self.container.connect(self.sender_host)
    +                self.sender = self.container.create_sender(self.sender_conn,
    +                                                           self.address)
    +                local_node.close()
    +                break
    +
    +        if not found:
    +            self.error = "Unable to create sender because of " \
    +                         "absence of address in the address table"
    +            self.addr_timer.cancel()
    +            self.timeout()
    +            local_node.close()
    +
    +    def create_sndr(self):
    +        self.sender_conn = self.container.connect(self.sender_host)
    +        self.sender = self.container.create_sender(self.sender_conn,
    +                                                   self.address)
    +
    +    def on_start(self, event):
    +        if self.large_msg:
    +            self.timer = event.reactor.schedule(10.0, Timeout(self))
    +        else:
    +            self.timer = event.reactor.schedule(20.0, Timeout(self))
    +
    +        # Create two receivers
    +        self.receiver1_conn = event.container.connect(self.receiver1_host)
    +        self.receiver2_conn = event.container.connect(self.receiver2_host)
    +        self.receiver3_conn = event.container.connect(self.receiver3_host)
    +        self.receiver1 = event.container.create_receiver(self.receiver1_conn,
    +                                                         self.address)
    +        self.receiver2 = event.container.create_receiver(self.receiver2_conn,
    +                                                         self.address)
    +        self.receiver3 = event.container.create_receiver(self.receiver3_conn,
    +                                                         self.address)
    +        self.container = event.container
    +
    +    def on_link_opened(self, event):
    +        if event.receiver == self.receiver1 or \
    +                event.receiver == self.receiver2 or \
    +                event.receiver == self.receiver3:
    +            self.r_attaches += 1
    +            if self.r_attaches == 3:
    +                self.addr_timer = event.reactor.schedule(4.0,
    --- End diff --
    
    Why do you wait so long?  These tests take a very long time to run.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org