You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/19 00:19:13 UTC

[1/3] qpid-dispatch git commit: DISPATCH-179 - Bitmask module update: Add cardinality accessor, add last-value return, add for-each loop macro

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 69a46c623 -> 025cf4fa5


DISPATCH-179 - Bitmask module update: Add cardinality accessor, add last-value return, add for-each loop macro


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/570aa27d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/570aa27d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/570aa27d

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 570aa27df1637f1738861fab01c44de03d38a06a
Parents: 69a46c6
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Jan 18 16:53:50 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jan 18 16:53:50 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/bitmask.h | 10 ++++-
 src/bitmask.c                   | 75 +++++++++++++++++++++++++++++++++---
 tests/tool_test.c               | 46 ++++++++++++++++++----
 3 files changed, 117 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/570aa27d/include/qpid/dispatch/bitmask.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/bitmask.h b/include/qpid/dispatch/bitmask.h
index 4a6bd46..91239ec 100644
--- a/include/qpid/dispatch/bitmask.h
+++ b/include/qpid/dispatch/bitmask.h
@@ -40,10 +40,16 @@ qd_bitmask_t *qd_bitmask(int initial);
 void qd_bitmask_free(qd_bitmask_t *b);
 void qd_bitmask_set_all(qd_bitmask_t *b);
 void qd_bitmask_clear_all(qd_bitmask_t *b);
-void qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum);
-void qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum);
+int qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum);
+int qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum);
 int qd_bitmask_value(qd_bitmask_t *b, int bitnum);
 int qd_bitmask_first_set(qd_bitmask_t *b, int *bitnum);
+int qd_bitmask_cardinality(const qd_bitmask_t *b);
+
+int _qdbm_start(qd_bitmask_t *b);
+void _qdbm_next(qd_bitmask_t *b, int *v);
+
+#define QD_BITMASK_EACH(M,V,C) C=qd_bitmask_cardinality(M),V=_qdbm_start(M);V>=0 && C;_qdbm_next(M,&V),C--
 
 ///@}
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/570aa27d/src/bitmask.c
----------------------------------------------------------------------
diff --git a/src/bitmask.c b/src/bitmask.c
index ac5f201..3ea79ed 100644
--- a/src/bitmask.c
+++ b/src/bitmask.c
@@ -24,12 +24,13 @@
 #include <stdlib.h>
 #include <sys/types.h>
 
-#define QD_BITMASK_LONGS 16
+#define QD_BITMASK_LONGS 2
 #define QD_BITMASK_BITS  (QD_BITMASK_LONGS * 64)
 
 struct qd_bitmask_t {
     uint64_t array[QD_BITMASK_LONGS];
     int      first_set;
+    int      cardinality;
 };
 
 ALLOC_DECLARE(qd_bitmask_t);
@@ -69,7 +70,8 @@ void qd_bitmask_set_all(qd_bitmask_t *b)
 {
     for (int i = 0; i < QD_BITMASK_LONGS; i++)
         b->array[i] = 0xFFFFFFFFFFFFFFFF;
-    b->first_set = 0;
+    b->first_set   = 0;
+    b->cardinality = QD_BITMASK_BITS;
 }
 
 
@@ -77,25 +79,42 @@ void qd_bitmask_clear_all(qd_bitmask_t *b)
 {
     for (int i = 0; i < QD_BITMASK_LONGS; i++)
         b->array[i] = 0;
-    b->first_set = FIRST_NONE;
+    b->first_set   = FIRST_NONE;
+    b->cardinality = 0;
 }
 
 
-void qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum)
+int qd_bitmask_set_bit(qd_bitmask_t *b, int bitnum)
 {
+    int old_value = 1;
     assert(bitnum < QD_BITMASK_BITS);
+    if ((b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) == 0) {
+        old_value = 0;
+        b->cardinality++;
+    }
+
     b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum);
     if (b->first_set > bitnum || b->first_set < 0)
         b->first_set = bitnum;
+
+    return old_value;
 }
 
 
-void qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum)
+int qd_bitmask_clear_bit(qd_bitmask_t *b, int bitnum)
 {
+    int old_value = 0;
     assert(bitnum < QD_BITMASK_BITS);
+    if (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) {
+        old_value = 1;
+        b->cardinality--;
+    }
+
     b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum));
     if (b->first_set == bitnum)
         b->first_set = FIRST_UNKNOWN;
+
+    return old_value;
 }
 
 
@@ -125,3 +144,49 @@ int qd_bitmask_first_set(qd_bitmask_t *b, int *bitnum)
     *bitnum = b->first_set;
     return 1;
 }
+
+
+int qd_bitmask_cardinality(const qd_bitmask_t *b)
+{
+    return b->cardinality;
+}
+
+
+int _qdbm_start(qd_bitmask_t *b)
+{
+    int v;
+    if (qd_bitmask_first_set(b, &v))
+        return v;
+    return -1;
+}
+
+
+void _qdbm_next(qd_bitmask_t *b, int *v)
+{
+    if (*v == QD_BITMASK_BITS - 1) {
+        *v = -1;
+        return;
+    }
+
+    int      idx  = MASK_INDEX(*v);
+    uint64_t bits = MASK_ONEHOT(*v);
+    int      next = *v;
+
+    while (1) {
+        next++;
+        if (bits & 0x8000000000000000LL) {
+            if (++idx == QD_BITMASK_LONGS) {
+                *v = -1;
+                return;
+            }
+            bits = 1;
+        } else
+            bits = bits << 1;
+
+        if (b->array[idx] & bits) {
+            *v = next;
+            return;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/570aa27d/tests/tool_test.c
----------------------------------------------------------------------
diff --git a/tests/tool_test.c b/tests/tool_test.c
index 74a7aae..a4ce830 100644
--- a/tests/tool_test.c
+++ b/tests/tool_test.c
@@ -194,25 +194,57 @@ static char* test_bitmask(void *context)
 {
     qd_bitmask_t *bm;
     int           num;
+    int           old;
+    int           c;
+    int           total;
+    int           count;
 
     bm = qd_bitmask(0);
-    if (!bm)                            return "Can't allocate a bit mask";
-    if (qd_bitmask_first_set(bm, &num)) return "Expected no first set bit";
-
-    qd_bitmask_set_bit(bm, 3);
-    qd_bitmask_set_bit(bm, 500);
+    if (!bm)                             return "Can't allocate a bit mask";
+    if (qd_bitmask_first_set(bm, &num))  return "Expected no first set bit";
+    if (qd_bitmask_cardinality(bm) != 0) return "Expected cardinality == 0";
+
+    old = qd_bitmask_set_bit(bm, 3);
+    if (old)                             return "Expected old value to be zero";
+    if (qd_bitmask_cardinality(bm) != 1) return "Expected cardinality == 1";
+    old = qd_bitmask_set_bit(bm, 3);
+    if (!old)                            return "Expected old value to be one";
+    qd_bitmask_set_bit(bm, 100);
+    if (qd_bitmask_cardinality(bm) != 2) return "Expected cardinality == 2";
 
     if (!qd_bitmask_first_set(bm, &num)) return "Expected first set bit";
     if (num != 3)                        return "Expected first set bit to be 3";
 
-    qd_bitmask_clear_bit(bm, num);
+    old = qd_bitmask_clear_bit(bm, num);
+    if (!old)                            return "Expected old value to be one(2)";
+    old = qd_bitmask_clear_bit(bm, num);
+    if (old)                             return "Expected old value to be zero(2)";
 
     if (!qd_bitmask_first_set(bm, &num)) return "Expected first set bit (2)";
-    if (num != 500)                      return "Expected first set bit to be 500";
+    if (num != 100)                      return "Expected first set bit to be 100";
 
     qd_bitmask_clear_bit(bm, num);
     if (qd_bitmask_first_set(bm, &num)) return "Expected no first set bit (2)";
 
+    qd_bitmask_set_bit(bm, 6);
+    qd_bitmask_set_bit(bm, 2);
+    qd_bitmask_set_bit(bm, 4);
+    qd_bitmask_set_bit(bm, 8);
+    qd_bitmask_set_bit(bm, 70);
+    qd_bitmask_clear_bit(bm, 8);
+    qd_bitmask_clear_bit(bm, 80);
+
+    if (qd_bitmask_cardinality(bm) != 4) return "Expected cardinality == 4";
+
+    total = 0;
+    count = 0;
+    for (QD_BITMASK_EACH(bm, num, c)) {
+        total += num;
+        count++;
+    }
+    if (count != 4)  return "Expected count to be 4";
+    if (total != 82) return "Expected bit-number total to be 82";
+
     qd_bitmask_free(bm);
 
     return 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-dispatch git commit: DISPATCH-179 - Changed from address->rnode linkage to using a bitmask to identify host routers for an address.

Posted by tr...@apache.org.
DISPATCH-179 - Changed from address->rnode linkage to using a bitmask to
               identify host routers for an address.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/65234d76
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/65234d76
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/65234d76

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 65234d76f213038a07164c10949a9569d97e3970
Parents: 570aa27
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Jan 18 17:04:30 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jan 18 17:04:30 2016 -0500

----------------------------------------------------------------------
 src/router_core/agent_address.c       |  2 +-
 src/router_core/connections.c         |  3 +-
 src/router_core/forwarder.c           | 78 ++++++++++++++++++++++++------
 src/router_core/route_tables.c        | 28 ++++++++---
 src/router_core/router_core.c         |  1 +
 src/router_core/router_core_private.h |  2 +-
 src/router_core/transfer.c            |  3 ++
 7 files changed, 93 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index e8e82ac..c868e39 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -62,7 +62,7 @@ static void qdr_insert_address_columns_CT(qdr_address_t        *addr,
             break;
 
         case QDR_ADDRESS_REMOTE_COUNT:
-            qd_compose_insert_uint(body, DEQ_SIZE(addr->rnodes));
+            qd_compose_insert_uint(body, qd_bitmask_cardinality(addr->rnodes));
             break;
 
         case QDR_ADDRESS_HOST_ROUTERS:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0833214..8362261 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -481,10 +481,11 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
     // deleted.
     //
     if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
-        DEQ_SIZE(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) {
+        qd_bitmask_cardinality(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) {
         qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
         DEQ_REMOVE(core->addrs, addr);
         qd_hash_handle_free(addr->hash_handle);
+        qd_bitmask_free(addr->rnodes);
         free_qdr_address_t(addr);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 78fbe1d..d7dc176 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -140,10 +140,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
         qdr_address_t *origin_addr;
         qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
-        if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
-            qdr_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
-            origin = rref->router->mask_bit;
-        }
+        if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
+            qd_bitmask_first_set(origin_addr->rnodes, &origin);
     } else
         origin = 0;
 
@@ -151,10 +149,10 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     // Forward to the next-hops for remote destinations.
     //
     if (origin >= 0) {
-        qdr_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
-        qdr_link_t       *dest_link;
-        qdr_node_t       *next_node;
-        qd_bitmask_t     *link_set = qd_bitmask(0);
+        int           dest_bit;
+        qdr_link_t   *dest_link;
+        qdr_node_t   *next_node;
+        qd_bitmask_t *link_set = qd_bitmask(0);
 
         //
         // Loop over the target nodes for this address.  Build a set of outgoing links
@@ -164,15 +162,20 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         // will send only one copy of the message over the link and allow a downstream
         // router to fan the message out.
         //
-        while (dest_node_ref) {
-            if (dest_node_ref->router->next_hop)
-                next_node = dest_node_ref->router->next_hop;
+        int c;
+        for (QD_BITMASK_EACH(addr->rnodes, dest_bit, c)) {
+            qdr_node_t *rnode = core->routers_by_mask_bit[dest_bit];
+            if (!rnode)
+                continue;
+
+            if (rnode->next_hop)
+                next_node = rnode->next_hop;
             else
-                next_node = dest_node_ref->router;
+                next_node = rnode;
+
             dest_link = control ? next_node->peer_control_link : next_node->peer_data_link;
-            if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
+            if (dest_link && qd_bitmask_value(rnode->valid_origins, origin))
                 qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
-            dest_node_ref = DEQ_NEXT(dest_node_ref);
         }
 
         //
@@ -219,6 +222,53 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
                            bool             exclude_inprocess,
                            bool             control)
 {
+    //
+    // Forward to an in-process subscriber if there is one
+    //
+    if (!exclude_inprocess) {
+        qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
+        if (sub) {
+            qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
+
+            //
+            // Rotate this subscription to the end of the list to get round-robin distribution
+            //
+            if (DEQ_SIZE(addr->subscriptions) > 1) {
+                DEQ_REMOVE_HEAD(addr->subscriptions);
+                DEQ_INSERT_TAIL(addr->subscriptions, sub);
+            }
+
+            return 1;
+        }
+    }
+
+    //
+    // Forward to a local subscriber
+    //
+    qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+    if (link_ref) {
+        qdr_link_t     *out_link     = link_ref->link;
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+        qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+        //
+        // If there are multiple local subscribers, rotate the list of link references
+        // so deliveries will be distributed among the subscribers in a round-robin pattern.
+        //
+        if (DEQ_SIZE(addr->rlinks) > 1) {
+            DEQ_REMOVE_HEAD(addr->rlinks);
+            DEQ_INSERT_TAIL(addr->rlinks, link_ref);
+        }
+
+        return 1;
+    }
+
+    //
+    // TODO
+    // Forward to remote routers with subscribers using the appropriate
+    // link for the traffic class: control or data
+    //
+
     return 0;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index fd88a6b..b2e11e2 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -255,14 +255,19 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
         //
         // Link the router record to the address record.
         //
-        qdr_add_node_ref(&addr->rnodes, rnode);
+        qd_bitmask_set_bit(addr->rnodes, router_maskbit);
 
         //
         // Link the router record to the router address records.
         // Use the T-class addresses only.
         //
-        qdr_add_node_ref(&core->router_addr_T->rnodes, rnode);
-        qdr_add_node_ref(&core->routerma_addr_T->rnodes, rnode);
+        qd_bitmask_set_bit(core->router_addr_T->rnodes, router_maskbit);
+        qd_bitmask_set_bit(core->routerma_addr_T->rnodes, router_maskbit);
+
+        //
+        // Bump the ref-count by three for each of the above links.
+        //
+        rnode->ref_count += 3;
 
         //
         // Add the router record to the mask-bit index.
@@ -295,7 +300,10 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
     //
     // Unlink the router node from the address record
     //
-    qdr_del_node_ref(&oaddr->rnodes, rnode);
+    qd_bitmask_clear_bit(oaddr->rnodes, router_maskbit);
+    qd_bitmask_clear_bit(core->router_addr_T->rnodes, router_maskbit);
+    qd_bitmask_clear_bit(core->routerma_addr_T->rnodes, router_maskbit);
+    rnode->ref_count -= 3;
 
     //
     // While the router node has a non-zero reference count, look for addresses
@@ -303,7 +311,11 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
     //
     qdr_address_t *addr = DEQ_HEAD(core->addrs);
     while (addr && rnode->ref_count > 0) {
-        qdr_del_node_ref(&addr->rnodes, rnode);
+        if (qd_bitmask_clear_bit(addr->rnodes, router_maskbit))
+            //
+            // If the cleared bit was originally set, decrement the ref count
+            //
+            rnode->ref_count--;
         addr = DEQ_NEXT(addr);
     }
     assert(rnode->ref_count == 0);
@@ -493,7 +505,8 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
         }
 
         qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
-        qdr_add_node_ref(&addr->rnodes, rnode);
+        qd_bitmask_set_bit(addr->rnodes, router_maskbit);
+        rnode->ref_count++;
 
         //
         // TODO - If this affects a waypoint, create the proper side effects
@@ -536,7 +549,8 @@ static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, boo
             break;
         }
 
-        qdr_del_node_ref(&addr->rnodes, rnode);
+        qd_bitmask_clear_bit(addr->rnodes, router_maskbit);
+        rnode->ref_count--;
 
         //
         // TODO - If this affects a waypoint, create the proper side effects

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index bf8be0e..c842e66 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -162,6 +162,7 @@ qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics
     qdr_address_t *addr = new_qdr_address_t();
     ZERO(addr);
     addr->forwarder = qdr_forwarder_CT(core, semantics);
+    addr->rnodes    = qd_bitmask(0);
     return addr;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 9b0194f..691ddb0 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -254,7 +254,7 @@ struct qdr_address_t {
     qdr_lrp_ref_list_t       lrps;          ///< Local link-route destinations
     qdr_link_ref_list_t      rlinks;        ///< Locally-Connected Consumers
     qdr_link_ref_list_t      inlinks;       ///< Locally-Connected Producers
-    qdr_router_ref_list_t    rnodes;        ///< Remotely-Connected Consumers
+    qd_bitmask_t            *rnodes;        ///< Bitmask of remote routers with connected consumers
     qd_hash_handle_t        *hash_handle;   ///< Linkage back to the hash table entry
     qdr_forwarder_t         *forwarder;
     bool                     toggle;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65234d76/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5fd61eb..a6a412b 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -288,6 +288,9 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
         qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
         qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr);
         if (addr)
+            //
+            // Forward the message.  We don't care what the fanout count is.
+            //
             (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
         else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-dispatch git commit: DISPATCH-179 - Added a new facility to efficiently convert trace lists into bitmasks describing a set of routers. This will be used to properly implement loop-prevention in the core (the resu

Posted by tr...@apache.org.
DISPATCH-179 - Added a new facility to efficiently convert trace lists into bitmasks
               describing a set of routers.  This will be used to properly implement
               loop-prevention in the core (the resulting bitmasks will be used by the
               core as an exclusion list for forwarding).


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/025cf4fa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/025cf4fa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/025cf4fa

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 025cf4fa5d1ec602d1b128ae2b38a42745fcafbd
Parents: 65234d7
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Jan 18 18:16:36 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jan 18 18:16:36 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/trace_mask.h |  75 ++++++++++++++++++++
 src/CMakeLists.txt                 |   1 +
 src/trace_mask.c                   | 119 ++++++++++++++++++++++++++++++++
 3 files changed, 195 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/025cf4fa/include/qpid/dispatch/trace_mask.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/trace_mask.h b/include/qpid/dispatch/trace_mask.h
new file mode 100644
index 0000000..7b025b0
--- /dev/null
+++ b/include/qpid/dispatch/trace_mask.h
@@ -0,0 +1,75 @@
+#ifndef __trace_mask_h__
+#define __trace_mask_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/bitmask.h>
+
+typedef struct qd_tracemask_t qd_tracemask_t;
+
+/**
+ * qd_tracemask
+ *
+ * Create a TraceMask object.
+ */
+qd_tracemask_t *qd_tracemask(void);
+
+/**
+ * qd_tracemask_free
+ *
+ * Destroy a TraceMask object and free its allocated resources.
+ */
+void qd_tracemask_free(qd_tracemask_t *tm);
+
+/**
+ * qd_tracemask_add_router
+ *
+ * Notify the TraceMask of a new router and its assigned mask bit.
+ *
+ * @param tm Tracemask created by qd_tracemask()
+ * @param address The address of the remote router as reported by the router module.
+ * @param maskbit The mask bit assigned to this router by the router module.
+ */
+void qd_tracemask_add_router(qd_tracemask_t *tm, const char *address, int maskbit);
+
+/**
+ * qd_tracemask_del_router
+ *
+ * Notify the TraceMask of the removal of a router.
+ *
+ * @param tm Tracemask created by qd_tracemask()
+ * @param maskbit The mask bit assigned to this router by the router module.
+ */
+void qd_tracemask_del_router(qd_tracemask_t *tm, int maskbit);
+
+/**
+ * qd_tracemask_create
+ *
+ * Create a new bitmask with a bit set for every router mentioned in the trace list.
+ *
+ * @param tm Tracemask created by qd_tracemask()
+ * @param tracelist The parsed field from a message's trace header
+ * @return A new bit mask with a set-bit for each router in the list.  This must be freed
+ *         by the caller when the caller is done with it.
+ */
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/025cf4fa/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5301081..5ae8076 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -82,6 +82,7 @@ set(qpid_dispatch_SOURCES
   schema_enum.c
   server.c
   timer.c
+  trace_mask.c
   )
 
 if(USE_MEMORY_POOL)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/025cf4fa/src/trace_mask.c
----------------------------------------------------------------------
diff --git a/src/trace_mask.c b/src/trace_mask.c
new file mode 100644
index 0000000..b5c505a
--- /dev/null
+++ b/src/trace_mask.c
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/trace_mask.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/hash.h>
+#include "alloc.h"
+
+typedef struct {
+    qd_hash_handle_t *hash_handle;
+    int               maskbit;
+} qdtm_router_t;
+
+ALLOC_DECLARE(qdtm_router_t);
+ALLOC_DEFINE(qdtm_router_t);
+
+struct qd_tracemask_t {
+    sys_rwlock_t   *lock;
+    qd_hash_t      *hash;
+    qdtm_router_t **router_by_mask_bit;
+};
+
+
+qd_tracemask_t *qd_tracemask(void)
+{
+    qd_tracemask_t *tm = NEW(qd_tracemask_t);
+    tm->lock               = sys_rwlock();
+    tm->hash               = qd_hash(8, 1, 0);
+    tm->router_by_mask_bit = NEW_PTR_ARRAY(qdtm_router_t, qd_bitmask_width());
+
+    for (int i = 0; i < qd_bitmask_width(); i++)
+        tm->router_by_mask_bit[i] = 0;
+    return tm;
+}
+
+
+void qd_tracemask_free(qd_tracemask_t *tm)
+{
+    for (int i = 0; i < qd_bitmask_width(); i++) {
+        if (tm->router_by_mask_bit[i])
+            qd_tracemask_del_router(tm, i);
+    }
+
+    qd_hash_free(tm->hash);
+    sys_rwlock_free(tm->lock);
+    free(tm);
+}
+
+
+void qd_tracemask_add_router(qd_tracemask_t *tm, const char *address, int maskbit)
+{
+    qd_field_iterator_t *iter = qd_address_iterator_string(address, ITER_VIEW_NODE_HASH);
+    sys_rwlock_wrlock(tm->lock);
+    assert(maskbit < qd_bitmask_width() && tm->router_by_mask_bit[maskbit] == 0);
+    if (maskbit < qd_bitmask_width() && tm->router_by_mask_bit[maskbit] == 0) {
+        qdtm_router_t *router = new_qdtm_router_t();
+        router->maskbit = maskbit;
+        qd_hash_insert(tm->hash, iter, router, &router->hash_handle);
+        tm->router_by_mask_bit[maskbit] = router;
+    }
+    sys_rwlock_unlock(tm->lock);
+    qd_field_iterator_free(iter);
+}
+
+
+void qd_tracemask_del_router(qd_tracemask_t *tm, int maskbit)
+{
+    sys_rwlock_wrlock(tm->lock);
+    assert(maskbit < qd_bitmask_width() && tm->router_by_mask_bit[maskbit] != 0);
+    if (maskbit < qd_bitmask_width() && tm->router_by_mask_bit[maskbit] != 0) {
+        qdtm_router_t *router = tm->router_by_mask_bit[maskbit];
+        qd_hash_remove_by_handle(tm->hash, router->hash_handle);
+        tm->router_by_mask_bit[maskbit] = 0;
+        free_qdtm_router_t(router);
+    }
+    sys_rwlock_unlock(tm->lock);
+}
+
+
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist)
+{
+    qd_bitmask_t *bm  = qd_bitmask(0);
+    int           idx = 0;
+
+    assert(qd_parse_is_list(tracelist));
+
+    sys_rwlock_rdlock(tm->lock);
+    qd_parsed_field_t *item   = qd_parse_sub_value(tracelist, idx);
+    qdtm_router_t     *router = 0;
+    while (item) {
+        qd_field_iterator_t *iter = qd_parse_raw(item);
+        qd_address_iterator_reset_view(iter, ITER_VIEW_NODE_HASH);
+        qd_hash_retrieve(tm->hash, iter, (void*) &router);
+        if (router)
+            qd_bitmask_set_bit(bm, router->maskbit);
+        idx++;
+        item = qd_parse_sub_value(tracelist, idx);
+    }
+    sys_rwlock_unlock(tm->lock);
+    return bm;
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org