You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/03/21 20:35:34 UTC

[1/2] qpid-dispatch git commit: DISPATCH-89: Exchange Binding forwarder

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master dfa5fd46d -> fff61db81


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c58ef5b..0f22df5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -36,6 +36,7 @@ typedef struct qdr_link_route_t      qdr_link_route_t;
 typedef struct qdr_auto_link_t       qdr_auto_link_t;
 typedef struct qdr_conn_identifier_t qdr_conn_identifier_t;
 typedef struct qdr_connection_ref_t  qdr_connection_ref_t;
+typedef struct qdr_exchange          qdr_exchange_t;
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -465,7 +466,12 @@ struct qdr_address_t {
     // State for "balanced" treatment
     //
     int *outstanding_deliveries;
-    
+
+    //
+    // State for "exchange" treatment
+    //
+    qdr_exchange_t      *exchange;  // weak ref
+
     /**@name Statistics */
     ///@{
     uint64_t deliveries_ingress;
@@ -623,6 +629,7 @@ struct qdr_conn_identifier_t {
 };
 
 ALLOC_DECLARE(qdr_conn_identifier_t);
+DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
 
 
 struct qdr_core_t {
@@ -706,6 +713,7 @@ struct qdr_core_t {
     uint64_t              next_identifier;
     sys_mutex_t          *id_lock;
 
+    qdr_exchange_list_t   exchanges;
     qdr_forwarder_t      *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index b1ba497..e8d37ef 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -18,6 +18,7 @@
  */
 
 #include "router_core_private.h"
+#include "exchange_bindings.h"
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 #include <inttypes.h>
@@ -776,8 +777,13 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
  */
 static long qdr_addr_path_count_CT(qdr_address_t *addr)
 {
-    return (long) DEQ_SIZE(addr->subscriptions) + (long) DEQ_SIZE(addr->rlinks) +
-        (long) qd_bitmask_cardinality(addr->rnodes);
+    long rc = ((long) DEQ_SIZE(addr->subscriptions)
+               + (long) DEQ_SIZE(addr->rlinks)
+               + (long) qd_bitmask_cardinality(addr->rnodes));
+    if (addr->exchange)
+        rc += qdr_exchange_binding_count(addr->exchange)
+            + ((qdr_exchange_alternate_addr(addr->exchange)) ? 1 : 0);
+    return rc;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 743c4f8..6b7347a 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -42,6 +42,8 @@ qd_error_t qd_pyrouter_tick(qd_router_t *router);
 qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity);
 qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *entity);
 qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entity);
+qd_error_t qd_router_configure_exchange(qd_router_t *router, qd_entity_t *entity);
+qd_error_t qd_router_configure_binding(qd_router_t *router, qd_entity_t *entity);
 
 void qd_router_configure_free(qd_router_t *router);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index ff8f787..b2f0f2e 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -107,6 +107,7 @@ foreach(py_test_module
     system_tests_topology_disposition
     system_tests_topology_addition
     system_tests_disallow_link_resumable_link_route
+    system_tests_exchange_bindings
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index 6ad1263..425d977 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -239,11 +239,9 @@ static char *test_sub_iterator(void *context)
     return 0;
 }
 
-
 static char* view_address_hash(void *context, qd_iterator_t *iter,
                                const char *addr, const char *view)
 {
-    qd_iterator_annotate_phase(iter, '1');
     if (!qd_iterator_equal(iter, (unsigned char*) view)) {
         char *got = (char*) qd_iterator_copy(iter);
         snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s'",
@@ -254,6 +252,26 @@ static char* view_address_hash(void *context, qd_iterator_t *iter,
     return 0;
 }
 
+static char *check_dup(void *context, const qd_iterator_t *iter,
+                         const char *addr, const char *view)
+{
+    qd_iterator_t *dup = qd_iterator_dup(iter);
+    if (!dup)
+        return "dup of iterator failed";
+    char *ret = view_address_hash(context, dup, addr, view);
+    qd_iterator_free(dup);
+    return ret;
+}
+
+static char *verify_iterator(void *context, qd_iterator_t *iter,
+                             const char *addr, const char *view)
+{
+    char *ret = view_address_hash(context, iter, addr, view);
+    if (!ret)
+        ret = check_dup(context, iter, addr, view);
+    return ret;
+}
+
 static char* test_view_address_hash(void *context)
 {
     struct {const char *addr; const char *view;} cases[] = {
@@ -296,11 +314,10 @@ static char* test_view_address_hash(void *context)
 
     for (idx = 0; cases[idx].addr; idx++) {
         qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
-        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        qd_iterator_annotate_phase(iter, '1');
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         qd_iterator_free(iter);
-        if (ret) {
-            return strncpy(fail_text, ret, FAIL_TEXT_SIZE-1);
-        }
+        if (ret) return ret;
     }
 
     for (idx = 0; cases[idx].addr; idx++) {
@@ -310,7 +327,8 @@ static char* test_view_address_hash(void *context)
         qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(chain), 0,
                                                  strlen(cases[idx].addr),
                                                  ITER_VIEW_ADDRESS_HASH);
-        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        qd_iterator_annotate_phase(iter, '1');
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         release_buffer_chain(&chain);
         qd_iterator_free(iter);
         if (ret) return ret;
@@ -363,7 +381,8 @@ static char* test_view_address_with_space(void *context)
     for (idx = 0; cases[idx].addr; idx++) {
         qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_WITH_SPACE);
         qd_iterator_annotate_space(iter, "space/", 6);
-        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        qd_iterator_annotate_phase(iter, '1');
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         qd_iterator_free(iter);
         if (ret) return ret;
     }
@@ -376,7 +395,8 @@ static char* test_view_address_with_space(void *context)
                                                  strlen(cases[idx].addr),
                                                  ITER_VIEW_ADDRESS_WITH_SPACE);
         qd_iterator_annotate_space(iter, "space/", 6);
-        char *ret = view_address_hash(context, iter, cases[idx].addr, cases[idx].view);
+        qd_iterator_annotate_phase(iter, '1');
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         release_buffer_chain(&chain);
         qd_iterator_free(iter);
         if (ret) return ret;
@@ -400,14 +420,9 @@ static char* test_view_address_hash_override(void *context)
     for (idx = 0; cases[idx].addr; idx++) {
         qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
         qd_iterator_annotate_prefix(iter, 'C');
-        if (!qd_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
-            char *got = (char*) qd_iterator_copy(iter);
-            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s'",
-                     cases[idx].addr, cases[idx].view, got);
-            qd_iterator_free(iter);
-            return fail_text;
-        }
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         qd_iterator_free(iter);
+        if (ret) return ret;
     }
 
     return 0;
@@ -430,15 +445,9 @@ static char* test_view_address_hash_with_space(void *context)
     for (idx = 0; cases[idx].addr; idx++) {
         qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
         qd_iterator_annotate_space(iter, "test.vhost.", 11);
-        if (!qd_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
-            char *got = (char*) qd_iterator_copy(iter);
-            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s' (len: %d)",
-                     cases[idx].addr, cases[idx].view, got, qd_iterator_length(iter));
-            free(got);
-            qd_iterator_free(iter);
-            return fail_text;
-        }
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         qd_iterator_free(iter);
+        if (ret) return ret;
     }
 
     return 0;
@@ -457,14 +466,9 @@ static char* test_view_node_hash(void *context)
 
     for (idx = 0; cases[idx].addr; idx++) {
         qd_iterator_t *iter = qd_iterator_string(cases[idx].addr, ITER_VIEW_NODE_HASH);
-        if (!qd_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
-            char *got = (char*) qd_iterator_copy(iter);
-            snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s'",
-                     cases[idx].addr, cases[idx].view, got);
-            qd_iterator_free(iter);
-            return fail_text;
-        }
+        char *ret = verify_iterator(context, iter, cases[idx].addr, cases[idx].view);
         qd_iterator_free(iter);
+        if (ret) return ret;
     }
 
     return 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/tests/parse_tree_tests.c
----------------------------------------------------------------------
diff --git a/tests/parse_tree_tests.c b/tests/parse_tree_tests.c
index ef23f50..e8bcf3f 100644
--- a/tests/parse_tree_tests.c
+++ b/tests/parse_tree_tests.c
@@ -559,24 +559,37 @@ static char *test_multiple_matches(void *context)
 static char *test_validation(void *context)
 {
     qd_iterator_t *iter = qd_iterator_string("sam.*.am.#", ITER_VIEW_ALL);
-    if (!qd_parse_tree_validate_pattern(QD_PARSE_TREE_ADDRESS, iter) ||
-        !qd_parse_tree_validate_pattern(QD_PARSE_TREE_AMQP_0_10, iter)) {
+    qd_parse_tree_t *mqtt_tree = qd_parse_tree_new(QD_PARSE_TREE_MQTT);
+    qd_parse_tree_t *addr_tree = qd_parse_tree_new(QD_PARSE_TREE_ADDRESS);
+    qd_parse_tree_t *amqp_tree = qd_parse_tree_new(QD_PARSE_TREE_AMQP_0_10);
+
+    if (!qd_parse_tree_validate_pattern(addr_tree, iter) ||
+        !qd_parse_tree_validate_pattern(amqp_tree, iter)) {
         return "expected to skip validation";
     }
     qd_iterator_free(iter);
 
     qd_iterator_t *iter_good = qd_iterator_string("sam/+/a.#.m/#", ITER_VIEW_ALL);
-    if (!qd_parse_tree_validate_pattern(QD_PARSE_TREE_MQTT, iter_good)) {
+    if (!qd_parse_tree_validate_pattern(mqtt_tree, iter_good)) {
         return "expected to pass mqtt validation";
     }
     qd_iterator_free(iter_good);
 
     qd_iterator_t *iter_bad = qd_iterator_string("sam/#/am/+", ITER_VIEW_ALL);
-    if (qd_parse_tree_validate_pattern(QD_PARSE_TREE_MQTT, iter_bad)) {
+    if (qd_parse_tree_validate_pattern(mqtt_tree, iter_bad)) {
         return "expected to fail mqtt validation";
     }
     qd_iterator_free(iter_bad);
 
+    qd_iterator_t *iter_const = qd_iterator_string("sam/I/am", ITER_VIEW_ALL);
+    if (!qd_parse_tree_validate_pattern(mqtt_tree, iter_const)) {
+        return "expected to pass mqtt constant string validation";
+    }
+    qd_iterator_free(iter_const);
+
+    qd_parse_tree_free(mqtt_tree);
+    qd_parse_tree_free(addr_tree);
+    qd_parse_tree_free(amqp_tree);
     return NULL;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/tests/system_tests_exchange_bindings.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_exchange_bindings.py b/tests/system_tests_exchange_bindings.py
new file mode 100644
index 0000000..921f1ba
--- /dev/null
+++ b/tests/system_tests_exchange_bindings.py
@@ -0,0 +1,780 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import ast
+import unittest2 as unittest
+from threading import Thread
+from time import sleep
+from subprocess import PIPE, STDOUT
+
+try:
+    import Queue as Queue   # 2.7
+except ImportError:
+    import queue as Queue   # 3.x
+
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
+from proton import Message, Timeout
+from proton.reactor import AtMostOnce, AtLeastOnce
+from proton.utils import BlockingConnection, SendException
+
+#TIMEOUT=5
+_EXCHANGE_TYPE = "org.apache.qpid.dispatch.router.config.exchange"
+_BINDING_TYPE  = "org.apache.qpid.dispatch.router.config.binding"
+
+
+class _AsyncReceiver(object):
+    def __init__(self, address, source, credit=100, timeout=0.1,
+                 conn_args=None, link_args=None):
+        super(_AsyncReceiver, self).__init__()
+        kwargs = {'url': address}
+        if conn_args:
+            kwargs.update(conn_args)
+        self.conn = BlockingConnection(**kwargs)
+        kwargs = {'address': source,
+                  'credit': credit}
+        if link_args:
+            kwargs.update(link_args)
+        self.rcvr = self.conn.create_receiver(**kwargs)
+        self.thread = Thread(target=self._poll)
+        self.queue = Queue.Queue()
+        self._run = True
+        self._timeout = timeout
+        self.thread.start()
+
+    def _poll(self):
+        while self._run:
+            try:
+                msg = self.rcvr.receive(timeout=self._timeout)
+            except Timeout:
+                continue
+            try:
+                self.rcvr.accept()
+            except IndexError:
+                # PROTON-1743
+                pass
+            self.queue.put(msg)
+        self.rcvr.close()
+        self.conn.close()
+
+    def stop(self):
+        self._run = False
+        self.thread.join(timeout=TIMEOUT)
+
+
+class ExchangeBindingsTest(TestCase):
+    """
+    Tests the exchange/bindings of the dispatch router.
+    """
+    def _create_router(self, name, config):
+
+        config = [
+            ('router',   {'mode': 'standalone', 'id': 'QDR.%s'%name}),
+            ('listener', {'role': 'normal', 'host': '0.0.0.0',
+                          'port': self.tester.get_port(),
+                          'saslMechanisms':'ANONYMOUS'})
+            ] + config
+        return self.tester.qdrouterd(name, Qdrouterd.Config(config))
+
+    def run_qdmanage(self, router, cmd, input=None, expect=Process.EXIT_OK):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ')
+            + ['--bus', router.addresses[0], '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception, e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def _validate_entity(self, name, kind, entities, expected):
+        for entity in entities:
+            if "name" in entity and entity["name"] == name:
+                for k,v in expected.items():
+                    self.assertTrue(k in entity)
+                    self.assertEqual(v, entity[k])
+                return
+        raise Exception("Could not find %s named %s" % (kind, name))
+
+    def _validate_exchange(self, router, name, **kwargs):
+        _ = self.run_qdmanage(router, "query --type %s" % _EXCHANGE_TYPE)
+        self._validate_entity(name, "exchange", ast.literal_eval(_), kwargs)
+
+    def _validate_binding(self, router, name, **kwargs):
+        _ = self.run_qdmanage(router, "query --type %s" % _BINDING_TYPE)
+        self._validate_entity(name, "binding", ast.literal_eval(_), kwargs)
+
+    def test_qdmanage(self):
+        """
+        Tests the management API via qdmanage
+        """
+        router = self._create_router("A", [])
+
+        # create exchanges
+        ex_config = [
+            ["Exchange1", {"address": "Address1"}],
+            ["Exchange2", {"address": "Address2",
+                           "phase": 2,
+                           "alternateAddress": "Alternate2",
+                           "alternatePhase": 1,
+                           "matchMethod": "mqtt"}]
+        ]
+
+        for cfg in ex_config:
+            args = ""
+            for k, v in cfg[1].items():
+                args += "%s=%s " % (k, v)
+            self.run_qdmanage(router,
+                              "create --type %s --name %s %s" %
+                              (_EXCHANGE_TYPE, cfg[0], args))
+
+        # validate
+        _ = self.run_qdmanage(router, "query --type %s" % _EXCHANGE_TYPE)
+        query = ast.literal_eval(_)
+        self.assertEqual(len(ex_config), len(query))
+        for cfg in ex_config:
+            self._validate_entity(name=cfg[0],
+                                  kind="exchange",
+                                  entities=query,
+                                  expected=cfg[1])
+        for ex in query:
+            self.assertEqual(0, ex['bindingCount'])
+
+        # create bindings
+        binding_config = [
+            ["b11", {"exchangeName":    "Exchange1",
+                     "bindingKey":      "a.b.*.#",
+                     "nextHopAddress":  "nextHop1",
+                     "nextHopPhase":    3}],
+            ["b12", {"exchangeName":    "Exchange1",
+                     "bindingKey":      "a.*.c.#",
+                     "nextHopAddress":  "nextHop1",
+                     "nextHopPhase":    3}],
+            ["b13", {"exchangeName":    "Exchange1",
+                     "bindingKey":      "a.b.*.#",
+                     "nextHopAddress":  "nextHop2",
+                     "nextHopPhase":    0}],
+            ["b14", {"exchangeName":    "Exchange1",
+                     "bindingKey":      "a.*.c.#",
+                     "nextHopAddress":  "nextHop2",
+                     "nextHopPhase":    0}],
+
+            ["b21", {"exchangeName":    "Exchange2",
+                     "bindingKey":      "a/b/?/#",
+                     "nextHopAddress":  "nextHop3"}],
+            ["b22", {"exchangeName":    "Exchange2",
+                     "bindingKey":      "a",
+                     "nextHopAddress":  "nextHop4"}],
+            ["b23", {"exchangeName":    "Exchange2",
+                     "bindingKey":      "a/b",
+                     "nextHopAddress":  "nextHop4"}],
+            ["b24", {"exchangeName":    "Exchange2",
+                     "bindingKey":      "b",
+                     "nextHopAddress":  "nextHop3"}]
+        ]
+
+        for cfg in binding_config:
+            args = ""
+            for k, v in cfg[1].items():
+                args += "%s=%s " % (k, v)
+            self.run_qdmanage(router,
+                              "create --type %s --name %s %s" %
+                              (_BINDING_TYPE, cfg[0], args))
+
+        # validate
+        _ = self.run_qdmanage(router, "query --type %s" % _BINDING_TYPE)
+        bindings = ast.literal_eval(_)
+        self.assertEqual(len(binding_config), len(bindings))
+        for cfg in binding_config:
+            self._validate_entity(name=cfg[0],
+                                  kind="binding",
+                                  entities=bindings,
+                                  expected=cfg[1])
+
+        _ = self.run_qdmanage(router, "query --type %s" % _EXCHANGE_TYPE)
+        exchanges = ast.literal_eval(_)
+        self.assertEqual(len(ex_config), len(exchanges))
+        for ex in exchanges:
+            self.assertEqual(4, ex["bindingCount"])
+
+        # verify reads
+        _ = self.run_qdmanage(router, "read --type %s --name Exchange2" % _EXCHANGE_TYPE)
+        self.assertEqual("Exchange2", ast.literal_eval(_)["name"])
+        _ = self.run_qdmanage(router, "read --type %s --name b24" % _BINDING_TYPE)
+        self.assertEqual("b24", ast.literal_eval(_)["name"])
+
+        # binding deletion by id:
+        bid = bindings[0]["identity"]
+        self.run_qdmanage(router, "delete --type " + _BINDING_TYPE +
+                              " --identity %s" % bid)
+        _ = self.run_qdmanage(router, "query --type %s" % _BINDING_TYPE)
+        bindings = ast.literal_eval(_)
+        self.assertEqual(len(binding_config) - 1, len(bindings))
+        for binding in bindings:
+            self.assertFalse(binding["identity"] == bid)
+
+        # binding deletion by name:
+        self.run_qdmanage(router, "delete --type " + _BINDING_TYPE +
+                              " --name b14")
+        _ = self.run_qdmanage(router, "query --type %s" % _BINDING_TYPE)
+        bindings = ast.literal_eval(_)
+        self.assertEqual(len(binding_config) - 2, len(bindings))
+        for binding in bindings:
+            self.assertFalse(binding["name"] == "b14")
+
+        # exchange deletion by name:
+        self.run_qdmanage(router, "delete --type " + _EXCHANGE_TYPE +
+                              " --name Exchange1")
+        _ = self.run_qdmanage(router, "query --type %s" % _EXCHANGE_TYPE)
+        exchanges = ast.literal_eval(_)
+        self.assertEqual(len(ex_config) - 1, len(exchanges))
+        self.assertEqual("Exchange2", exchanges[0]["name"])
+
+        # negative testing
+
+        # exchange name is required
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _EXCHANGE_TYPE +
+                          " address=Nope")
+        # exchange address is required
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _EXCHANGE_TYPE +
+                          " --name Nope")
+        # duplicate exchange names
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _EXCHANGE_TYPE +
+                          " --name Exchange2 address=foo")
+        # invalid match method
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _EXCHANGE_TYPE +
+                          " --name Exchange3 address=foo"
+                          " matchMethod=blinky")
+        # duplicate exchange addresses
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _EXCHANGE_TYPE +
+                          " --name Nope address=Address2")
+        # binding with no exchange name
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " --name Nope")
+        # binding with bad exchange name
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " exchangeName=Nope")
+        # binding with duplicate name
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " --name b22 exchangeName=Exchange2"
+                          " bindingKey=b nextHopAddress=nextHop3")
+        # binding with duplicate pattern & next hop
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " --name Nuhuh exchangeName=Exchange2"
+                          " key=b nextHop=nextHop3")
+        # binding with no next hop
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " --name Nuhuh exchangeName=Exchange2"
+                          " bindingKey=x/y/z")
+
+        # invalid mqtt key
+        self.assertRaises(Exception, self.run_qdmanage, router,
+                          "create --type " + _BINDING_TYPE +
+                          " exchangeName=Exchange2"
+                          " bindingKey=x/#/z"
+                          " nextHopAddress=Nope")
+
+        # delete exchange by identity:
+        self.run_qdmanage(router, "delete --type " + _EXCHANGE_TYPE +
+                              " --identity %s" % exchanges[0]["identity"])
+
+    def test_forwarding(self):
+        """
+        Simple forwarding over a single 0-10 exchange
+        """
+        config = [
+            ('exchange', {'address': 'Address1',
+                          'name': 'Exchange1',
+                          'matchMethod': 'amqp'}),
+            # two different patterns, same next hop:
+            ('binding', {'name':           'binding1',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.*',
+                         'nextHopAddress': 'nextHop1'}),
+            ('binding', {'name':           'binding2',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.b',
+                         'nextHopAddress': 'nextHop1'}),
+            # duplicate patterns, different next hops:
+            ('binding', {'name':           'binding3',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.c.#',
+                         'nextHopAddress': 'nextHop1'}),
+            ('binding', {'name': 'binding4',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.c.#',
+                         'nextHopAddress': 'nextHop2'}),
+            # match for nextHop2 only
+            ('binding', {'name':           'binding5',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.b.c',
+                         'nextHopAddress': 'nextHop2'})
+        ]
+        router = self._create_router('A', config)
+
+        # create clients for message transfer
+        conn = BlockingConnection(router.addresses[0])
+        sender = conn.create_sender(address="Address1", options=AtMostOnce())
+        nhop1 = conn.create_receiver(address="nextHop1", credit=100)
+        nhop2 = conn.create_receiver(address="nextHop2", credit=100)
+
+        # verify initial metrics
+        self._validate_exchange(router, name='Exchange1',
+                                bindingCount=5,
+                                receivedCount=0,
+                                droppedCount=0,
+                                forwardedCount=0,
+                                divertedCount=0)
+
+        for b in range(5):
+            self._validate_binding(router,
+                                   name='binding%s' % (b + 1),
+                                   matchedCount=0)
+
+        # send message with subject "a.b"
+        # matches (binding1, binding2)
+        # forwarded to NextHop1 only
+        sender.send(Message(subject='a.b', body='A'))
+        self.assertEqual('A', nhop1.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "a.c"
+        # matches (bindings 1,3,4)
+        # ->  NextHop1, NextHop2
+        sender.send(Message(subject='a.c', body='B'))
+        self.assertEqual('B', nhop1.receive(timeout=TIMEOUT).body)
+        self.assertEqual('B', nhop2.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "a.c.d"
+        # matches bindings 3,4
+        # -> NextHop1, NextHop2
+        sender.send(Message(subject='a.c.d', body='C'))
+        self.assertEqual('C', nhop1.receive(timeout=TIMEOUT).body)
+        self.assertEqual('C', nhop2.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "x.y.z"
+        # no binding match - expected to drop
+        # not forwarded
+        sender.send(Message(subject='x.y.z', body=["I am Noone"]))
+
+        # send message with subject "a.b.c"
+        # matches binding5
+        # -> NextHop2
+        sender.send(Message(subject='a.b.c', body='D'))
+        self.assertEqual('D', nhop2.receive(timeout=TIMEOUT).body)
+
+        # ensure there are no more messages on either hop:
+
+        self.assertRaises(Timeout, nhop1.receive, timeout=0.25)
+        self.assertRaises(Timeout, nhop2.receive, timeout=0.25)
+
+        # validate counters
+        self._validate_binding(router, name='binding1',
+                               matchedCount=2)
+        self._validate_binding(router, name='binding2',
+                               matchedCount=1)
+        self._validate_binding(router, name='binding3',
+                               matchedCount=2)
+        self._validate_binding(router, name='binding4',
+                               matchedCount=2)
+        self._validate_binding(router, name='binding5',
+                               matchedCount=1)
+        self._validate_exchange(router, name="Exchange1",
+                                receivedCount=5,
+                                forwardedCount=4,
+                                divertedCount=0,
+                                droppedCount=1)
+        conn.close()
+
+    def test_forwarding_mqtt(self):
+        """
+        Simple forwarding over a single mqtt exchange
+        """
+        config = [
+            ('exchange', {'address':          'Address2',
+                          'name':             'Exchange1',
+                          'matchMethod':      'mqtt',
+                          'alternateAddress': 'altNextHop'}),
+
+            ('binding', {'name':           'binding1',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a/b',
+                         'nextHopAddress': 'nextHop1'}),
+            ('binding', {'name':           'binding2',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a/+',
+                         'nextHopAddress': 'nextHop2'}),
+            ('binding', {'name':           'binding3',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'c/#',
+                         'nextHopAddress': 'nextHop1'}),
+            ('binding', {'name':           'binding4',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'c/b',
+                         'nextHopAddress': 'nextHop2'}),
+        ]
+        router = self._create_router('B', config)
+
+        # create clients for message transfer
+        conn = BlockingConnection(router.addresses[0])
+        sender = conn.create_sender(address="Address2", options=AtMostOnce())
+        nhop1 = conn.create_receiver(address="nextHop1", credit=100)
+        nhop2 = conn.create_receiver(address="nextHop2", credit=100)
+        alt = conn.create_receiver(address="altNextHop", credit=100)
+
+        # send message with subject "a.b"
+        # matches (binding1, binding2)
+        # forwarded to NextHop1, NextHop2
+        sender.send(Message(subject='a/b', body='A'))
+        self.assertEqual('A', nhop1.receive(timeout=TIMEOUT).body)
+        self.assertEqual('A', nhop2.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "a/c"
+        # matches binding2
+        # ->  NextHop2
+        sender.send(Message(subject='a/c', body='B'))
+        self.assertEqual('B', nhop2.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "c/b"
+        # matches bindings 3,4
+        # -> NextHop1, NextHop2
+        sender.send(Message(subject='c/b', body='C'))
+        self.assertEqual('C', nhop1.receive(timeout=TIMEOUT).body)
+        self.assertEqual('C', nhop2.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "c/b/dee/eee"
+        # matches binding3
+        # -> NextHop1
+        sender.send(Message(subject='c/b/dee/eee', body='D'))
+        self.assertEqual('D', nhop1.receive(timeout=TIMEOUT).body)
+
+        # send message with subject "x.y.z"
+        # no binding match
+        # -> alternate
+        sender.send(Message(subject='x.y.z', body="?"))
+        self.assertEqual('?', alt.receive(timeout=TIMEOUT).body)
+
+        # ensure there are no more messages on either hop:
+
+        self.assertRaises(Timeout, nhop1.receive, timeout=0.25)
+        self.assertRaises(Timeout, nhop2.receive, timeout=0.25)
+        self.assertRaises(Timeout, alt.receive, timeout=0.25)
+
+        # validate counters
+        self._validate_binding(router, name='binding1',
+                               matchedCount=1)
+        self._validate_binding(router, name='binding2',
+                               matchedCount=2)
+        self._validate_binding(router, name='binding3',
+                               matchedCount=2)
+        self._validate_binding(router, name='binding4',
+                               matchedCount=1)
+        self._validate_exchange(router, name="Exchange1",
+                                receivedCount=5,
+                                forwardedCount=5,
+                                divertedCount=1,
+                                droppedCount=0)
+        conn.close()
+
+    def test_forwarding_sync(self):
+        """
+        Forward unsettled messages to multiple subscribers
+        """
+        config = [
+            ('router',   {'mode': 'standalone', 'id': 'QDR.mcast',
+                          'allowUnsettledMulticast': True}),
+            ('listener', {'role': 'normal', 'host': '0.0.0.0',
+                          'port': self.tester.get_port(),
+                          'saslMechanisms':'ANONYMOUS'}),
+            ('address', {'pattern': 'nextHop2/#', 'distribution': 'multicast'}),
+            ('exchange', {'address':          'Address3',
+                          'name':             'Exchange1',
+                          'alternateAddress': 'altNextHop'}),
+            ('binding', {'name':           'binding1',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.b',
+                         'nextHopAddress': 'nextHop1'}),
+            ('binding', {'name':           'binding2',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     '*.b',
+                         'nextHopAddress': 'nextHop2'})
+        ]
+        router = self.tester.qdrouterd('QDR.mcast', Qdrouterd.Config(config))
+
+        # create clients for message transfer
+        conn = BlockingConnection(router.addresses[0])
+        sender = conn.create_sender(address="Address3", options=AtLeastOnce())
+        nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
+        nhop2A = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
+        nhop2B = _AsyncReceiver(address=router.addresses[0], source="nextHop2")
+        alt = _AsyncReceiver(address=router.addresses[0], source="altNextHop")
+
+        sender.send(Message(subject='a.b', body='A'))
+        sender.send(Message(subject='x.y', body='B'))
+
+        self.assertEqual('A', nhop1.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual('A', nhop2A.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual('A', nhop2B.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual('B', alt.queue.get(timeout=TIMEOUT).body)
+        nhop1.stop()
+        nhop2A.stop()
+        nhop2B.stop()
+        alt.stop()
+        conn.close()
+
+        self.assertTrue(nhop1.queue.empty())
+        self.assertTrue(nhop2A.queue.empty())
+        self.assertTrue(nhop2B.queue.empty())
+        self.assertTrue(alt.queue.empty())
+
+        # ensure failure if unsettled multicast not allowed:
+
+        config = [
+            ('router',   {'mode': 'standalone', 'id': 'QDR.mcast2',
+                          'allowUnsettledMulticast': False}),
+            ('listener', {'role': 'normal', 'host': '0.0.0.0',
+                          'port': self.tester.get_port(),
+                          'saslMechanisms':'ANONYMOUS'}),
+            ('exchange', {'address': 'Address4',
+                          'name': 'Exchange1'}),
+            ('binding', {'name':           'binding1',
+                         'exchangeName':   'Exchange1',
+                         'bindingKey':     'a.b',
+                         'nextHopAddress': 'nextHop1'})
+        ]
+        router = self.tester.qdrouterd('QDR.mcast2', Qdrouterd.Config(config))
+
+        # create clients for message transfer
+        conn = BlockingConnection(router.addresses[0])
+        sender = conn.create_sender(address="Address4", options=AtLeastOnce())
+        nhop1 = _AsyncReceiver(address=router.addresses[0], source="nextHop1")
+
+        self.assertRaises(SendException,
+                          sender.send,
+                          Message(subject='a.b', body='A'))
+        nhop1.stop()
+        conn.close()
+
+        self.assertTrue(nhop1.queue.empty())
+
+    def test_remote_exchange(self):
+        """
+        Verify that the exchange and bindings are visible to other routers in
+        the network
+        """
+        def router(self, name, extra_config):
+
+            config = [
+                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name, 'allowUnsettledMulticast': 'yes'}),
+                ('listener', {'port': self.tester.get_port(), 'stripAnnotations': 'no'})
+            ] + extra_config
+
+            config = Qdrouterd.Config(config)
+
+            self.routers.append(self.tester.qdrouterd(name, config, wait=True))
+
+        self.inter_router_port = self.tester.get_port()
+        self.routers = []
+
+        router(self, 'A',
+               [('listener',
+                 {'role': 'inter-router', 'port': self.inter_router_port}),
+
+                ('address', {'pattern': 'nextHop1/#',
+                             'distribution': 'multicast'}),
+                ('address', {'pattern': 'nextHop2/#',
+                             'distribution': 'balanced'}),
+                ('address', {'pattern': 'nextHop3/#',
+                             'distribution': 'closest'}),
+
+                ('exchange', {'address': 'AddressA',
+                              'name': 'ExchangeA',
+                              'matchMethod': 'mqtt'}),
+
+                ('binding', {'name':           'bindingA1',
+                             'exchangeName':   'ExchangeA',
+                             'bindingKey':     'a/b',
+                             'nextHopAddress': 'nextHop1'}),
+                ('binding', {'name':           'bindingA2',
+                             'exchangeName':   'ExchangeA',
+                             'bindingKey':     'a/+',
+                             'nextHopAddress': 'nextHop2'}),
+                ('binding', {'name':           'bindingA3',
+                             'exchangeName':   'ExchangeA',
+                             'bindingKey':     '+/b',
+                             'nextHopAddress': 'nextHop3'}),
+                ('binding', {'name':           'bindingA4',
+                             'exchangeName':   'ExchangeA',
+                             'bindingKey':     'a/#',
+                             'nextHopAddress': 'NotSubscribed'})
+               ])
+
+        router(self, 'B',
+               [('connector', {'name': 'connectorToA',
+                               'role': 'inter-router',
+                               'port': self.inter_router_port,
+                               'verifyHostName': 'no'}),
+                ('address', {'pattern': 'nextHop1/#',
+                             'distribution': 'multicast'}),
+                ('address', {'pattern': 'nextHop2/#',
+                             'distribution': 'balanced'}),
+                ('address', {'pattern': 'nextHop3/#',
+                             'distribution': 'closest'})
+               ])
+
+        self.routers[0].wait_router_connected('QDR.B')
+        self.routers[1].wait_router_connected('QDR.A')
+        self.routers[1].wait_address('AddressA')
+
+        # connect clients to router B (no exchange)
+        nhop1A = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
+        nhop1B = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop1')
+        nhop2  = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop2')
+        nhop3  = _AsyncReceiver(self.routers[1].addresses[0], 'nextHop3')
+
+        self.routers[0].wait_address('nextHop1', remotes=1)
+        self.routers[0].wait_address('nextHop2', remotes=1)
+        self.routers[0].wait_address('nextHop3', remotes=1)
+
+        conn = BlockingConnection(self.routers[1].addresses[0])
+        sender = conn.create_sender(address="AddressA", options=AtLeastOnce())
+        sender.send(Message(subject='a/b', body='Hi!'))
+
+        # multicast
+        self.assertEqual('Hi!', nhop1A.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual('Hi!', nhop1B.queue.get(timeout=TIMEOUT).body)
+
+        # balanced and closest
+        self.assertEqual('Hi!', nhop2.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual('Hi!', nhop3.queue.get(timeout=TIMEOUT).body)
+
+        nhop1A.stop()
+        nhop1B.stop()
+        nhop2.stop()
+        nhop3.stop()
+        conn.close()
+
+    def test_large_messages(self):
+        """
+        Verify that multi-frame messages are forwarded properly
+        """
+        MAX_FRAME=1024
+        config = [
+            ('router', {'mode': 'interior', 'id': 'QDR.X',
+                        'allowUnsettledMulticast': 'yes'}),
+            ('listener', {'port': self.tester.get_port(),
+                          'stripAnnotations': 'no',
+                          'maxFrameSize': MAX_FRAME}),
+
+            ('address', {'pattern': 'nextHop1/#',
+                         'distribution': 'multicast'}),
+
+            ('exchange', {'address': 'AddressA',
+                          'name': 'ExchangeA'}),
+
+            ('binding', {'name':           'bindingA1',
+                         'exchangeName':   'ExchangeA',
+                         'bindingKey':     'a/b',
+                         'nextHopAddress': 'nextHop1'})
+        ]
+
+        router = self.tester.qdrouterd('QDR.X',
+                                       Qdrouterd.Config(config),
+                                       wait=True)
+
+        # connect clients to router B (no exchange)
+        nhop1A = _AsyncReceiver(router.addresses[0], 'nextHop1',
+                                conn_args={'max_frame_size': MAX_FRAME})
+        nhop1B = _AsyncReceiver(router.addresses[0], 'nextHop1',
+                                conn_args={'max_frame_size': MAX_FRAME})
+
+        conn = BlockingConnection(router.addresses[0],
+                                  max_frame_size=MAX_FRAME)
+        sender = conn.create_sender(address="AddressA")
+        jumbo = (10 * MAX_FRAME) * 'X'
+        sender.send(Message(subject='a/b', body=jumbo))
+
+        # multicast
+        self.assertEqual(jumbo, nhop1A.queue.get(timeout=TIMEOUT).body)
+        self.assertEqual(jumbo, nhop1B.queue.get(timeout=TIMEOUT).body)
+
+        nhop1A.stop()
+        nhop1B.stop()
+        conn.close()
+
+    def test_forwarding_fanout(self):
+        """
+        Verify bindings that do not have a key receive all messages
+        """
+        config = [
+            ('exchange', {'address': 'AddressF',
+                          'name': 'ExchangeF'}),
+            ('binding', {'name':           'binding1',
+                         'exchangeName':   'ExchangeF',
+                         'bindingKey':     'pattern',
+                         'nextHopAddress': 'nextHop1'}),
+            # two bindings w/o key
+            ('binding', {'name':           'binding2',
+                         'exchangeName':   'ExchangeF',
+                         'nextHopAddress': 'nextHop2'}),
+            ('binding', {'name':           'binding3',
+                         'exchangeName':   'ExchangeF',
+                         'nextHopAddress': 'nextHop3'})
+        ]
+
+        for meth in ['amqp', 'mqtt']:
+            config[0][1]['matchMethod'] = meth
+            router = self._create_router('A', config)
+
+            # create clients for message transfer
+            conn = BlockingConnection(router.addresses[0])
+            sender = conn.create_sender(address="AddressF", options=AtMostOnce())
+            nhop1 = conn.create_receiver(address="nextHop1", credit=100)
+            nhop2 = conn.create_receiver(address="nextHop2", credit=100)
+            nhop3 = conn.create_receiver(address="nextHop3", credit=100)
+
+            # send message with subject "nope"
+            # should arrive at nextHop2 & 3 only
+            sender.send(Message(subject='nope', body='A'))
+            self.assertEqual('A', nhop2.receive(timeout=TIMEOUT).body)
+            self.assertEqual('A', nhop3.receive(timeout=TIMEOUT).body)
+
+            # send message with subject "pattern"
+            # forwarded to all bindings:
+            sender.send(Message(subject='pattern', body='B'))
+            self.assertEqual('B', nhop1.receive(timeout=TIMEOUT).body)
+            self.assertEqual('B', nhop2.receive(timeout=TIMEOUT).body)
+            self.assertEqual('B', nhop3.receive(timeout=TIMEOUT).body)
+
+            conn.close()
+            router.teardown()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/tests/system_tests_qdmanage.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index c6fa186..521ab24 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -177,15 +177,15 @@ class QdmanageTest(TestCase):
 
     def test_get_types(self):
         out = json.loads(self.run_qdmanage("get-types"))
-        self.assertEqual(len(out), 26)
+        self.assertEqual(len(out), 28)
 
     def test_get_attributes(self):
         out = json.loads(self.run_qdmanage("get-attributes"))
-        self.assertEqual(len(out), 26)
+        self.assertEqual(len(out), 28)
 
     def test_get_operations(self):
         out = json.loads(self.run_qdmanage("get-operations"))
-        self.assertEqual(len(out), 26)
+        self.assertEqual(len(out), 28)
         self.assertEqual(out['org.apache.qpid.dispatch.sslProfile'], [u'CREATE', u'DELETE', u'READ'])
 
     def test_get_types_with_ssl_profile_type(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-89: Exchange Binding forwarder

Posted by kg...@apache.org.
DISPATCH-89: Exchange Binding forwarder

Adds a forwarder that models the pre AMQP 1.0 model of exchanges and
bindings.  Exchanges can be added via management and bound to outgoing
target addresses.  MQTT wildcard patterns are also supported.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fff61db8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fff61db8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fff61db8

Branch: refs/heads/master
Commit: fff61db817fa310ff01061f288c5f86f8d5be262
Parents: dfa5fd4
Author: Kenneth Giusti <kg...@apache.org>
Authored: Tue Jan 3 13:14:19 2017 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Wed Mar 21 16:26:17 2018 -0400

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json   |  100 ++
 python/qpid_dispatch_internal/dispatch.py       |    2 +
 .../qpid_dispatch_internal/management/agent.py  |   14 +
 .../qpid_dispatch_internal/management/config.py |   16 +-
 src/CMakeLists.txt                              |    1 +
 src/dispatch.c                                  |   12 +
 src/iterator.c                                  |    6 +-
 src/parse_tree.c                                |   12 +-
 src/parse_tree.h                                |    3 +-
 src/router_config.c                             |  107 ++
 src/router_core/agent.c                         |   29 +-
 src/router_core/connections.c                   |    9 +-
 src/router_core/exchange_bindings.c             | 1332 ++++++++++++++++++
 src/router_core/exchange_bindings.h             |   70 +
 src/router_core/forwarder.c                     |   28 +-
 src/router_core/forwarder.h                     |   48 +
 src/router_core/management_agent.c              |   22 +-
 src/router_core/router_core.c                   |    4 +
 src/router_core/router_core_private.h           |   10 +-
 src/router_core/transfer.c                      |   10 +-
 src/router_private.h                            |    2 +
 tests/CMakeLists.txt                            |    1 +
 tests/field_test.c                              |   66 +-
 tests/parse_tree_tests.c                        |   21 +-
 tests/system_tests_exchange_bindings.py         |  780 ++++++++++
 tests/system_tests_qdmanage.py                  |    6 +-
 26 files changed, 2612 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e1a26ea..a5262f7 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1188,6 +1188,106 @@
             }
         },
 
+        "router.config.exchange": {
+            "description":"[EXPERIMENTAL] Defines a topic exchange.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE", "DELETE"],
+            "attributes": {
+                "address": {
+                    "description": "The address of the exchange. Used by the message publisher as the target for sending messages.",
+                    "type": "string",
+                    "create": true,
+                    "required": true
+                },
+                "phase": {
+                    "type": "integer",
+                    "description": "The address phase for the exchange.  Defaults to '0'.",
+                    "create": true,
+                    "required": false
+                },
+                "alternateAddress": {
+                    "description": "The address to forward the message to if no bindings are matched.",
+                    "type": "string",
+                    "create": true,
+                    "required": false
+                },
+                "alternatePhase": {
+                    "type": "integer",
+                    "description": "The address phase for the alternateAddress.  Defaults to '0'.",
+                    "create": true,
+                    "required": false
+                },
+                "matchMethod": {
+                    "description": "Key matching algorithm used. 'amqp' uses the legacy AMQP topic exchange wildcard match method as described in the pre-1.0 drafts. 'mqtt' uses the MQTT topic filter wildcard match method.",
+                    "type": ["amqp", "mqtt"],
+                    "default": "amqp",
+                    "required": false,
+                    "create": true
+                },
+                "bindingCount": {
+                    "description": "The number of bindings associated with this exchange.",
+                    "type": "integer",
+                    "create": false
+                },
+                "receivedCount": {
+                    "description": "The total number of deliveries received by this exchange.",
+                    "type": "integer",
+                    "create": false
+                },
+                "droppedCount": {
+                    "description": "The total number of deliveries dropped due to the lack of an outgoing subscription.",
+                    "type": "integer",
+                    "create": false
+                },
+                "forwardedCount": {
+                    "description": "The total number of deliveries forwarded via matched bindings or to the alternateAddress",
+                    "type": "integer",
+                    "create": false
+                },
+                "divertedCount": {
+                    "description": "A count of those deliveries that were forwarded via the alternateAddress only.  This is a subset of the forwardedCount.",
+                    "type": "integer",
+                    "create": false
+                }
+            }
+        },
+
+        "router.config.binding": {
+            "description":"[EXPERIMENTAL] Defines a keyed next hop binding for a topic exchange. The subject field of the messages arriving at the exchange is compared against the binding's key value using the exchange's matchMethod.  If the subject matches the key the message is forwarded to the nextHopAddress. The nextHopAddress overrides the message's original destination.",
+            "extends": "configurationEntity",
+            "operations": ["CREATE", "DELETE"],
+            "attributes": {
+                "exchangeName": {
+                    "description": "The name of the exchange to bind.",
+                    "type": "string",
+                    "create": true,
+                    "required": true
+                },
+                "bindingKey": {
+                    "description": "Pattern to compare against incoming message's subject.  The key is a string of zero or more tokens and wildcards. The format depends on the matchMethod configured for the exchange. For AMQP each token is delimited by the '.' character and wild-card tokens '*' matches a single token and '#' matches zero or more tokens. For MQTT each token is delimited by the '/' character and wildcard tokens '+' matches a single token and '#' matches zero or more tokens at the end of the topic. If a key is not provided the binding will match all messages arriving at the exchange (fanout behavior).",
+                    "type": "string",
+                    "create": true,
+                    "required": false
+                },
+                "nextHopAddress": {
+                    "description": "The address to forward the message to when the message's topic string matches the binding key pattern.  This address is used by message consumers as the source of incoming messages.",
+                    "type": "string",
+                    "create": true,
+                    "required": true
+                },
+                "nextHopPhase": {
+                    "type": "integer",
+                    "description": "The address phase used when forwarding messages that match this binding.",
+                    "create": true,
+                    "required": false
+                },
+                "matchedCount": {
+                    "description": "Total number of deliveries that matched this binding.",
+                    "type": "integer"
+                }
+            }
+        },
+
         "router.link": {
             "description": "Link to another AMQP endpoint: router node, client or other AMQP process.",
             "extends": "operationalEntity",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py b/python/qpid_dispatch_internal/dispatch.py
index 758a4a3..4c9123b 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -70,6 +70,8 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_configure_address, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_link_route, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_configure_auto_link, None, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_exchange, None, [self.qd_dispatch_p, py_object])
+        self._prototype(self.qd_dispatch_configure_binding, None, [self.qd_dispatch_p, py_object])
 
         self._prototype(self.qd_dispatch_configure_policy, None, [self.qd_dispatch_p, py_object])
         self._prototype(self.qd_dispatch_register_policy_manager, None, [self.qd_dispatch_p, py_object])

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py
index 177d803..fc4bbae 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -487,6 +487,20 @@ class AllocatorEntity(EntityAdapter):
     def __str__(self):
         return super(AllocatorEntity, self).__str__().replace("Entity(", "AllocatorEntity(")
 
+class ExchangeEntity(EntityAdapter):
+    def create(self):
+        self._qd.qd_dispatch_configure_exchange(self._dispatch, self)
+
+    def __str__(self):
+        return super(ExchangeEntity, self).__str__().replace("Entity(", "ExchangeEntity(")
+
+class BindingEntity(EntityAdapter):
+    def create(self):
+        self._qd.qd_dispatch_configure_binding(self._dispatch, self)
+
+    def __str__(self):
+        return super(BindingEntity, self).__str__().replace("Entity(", "BindingEntity(")
+
 
 class EntityCache(object):
     """

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py
index 5ffa3c4..ecda9c6 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -52,6 +52,8 @@ class Config(object):
             if s[0] == "address":   s[0] = "router.config.address"
             if s[0] == "linkRoute": s[0] = "router.config.linkRoute"
             if s[0] == "autoLink":  s[0] = "router.config.autoLink"
+            if s[0] == "exchange":  s[0] = "router.config.exchange"
+            if s[0] == "binding":   s[0] = "router.config.binding"
 
     @staticmethod
     def _parse(lines):
@@ -59,17 +61,20 @@ class Config(object):
         begin = re.compile(r'([\w-]+)[ \t]*{') # WORD {
         end = re.compile(r'}')                 # }
         attr = re.compile(r'([\w-]+)[ \t]*:[ \t]*(.+)') # WORD1: VALUE
-        pattern = re.compile(r'([\w-]+)[ \t]*:[ \t]*([\S]+).*')
+
+        # The 'pattern:' and 'bindingKey:' attributes in the schema are special
+        # snowflakes. They allow '#' characters in their value, so they cannot
+        # be treated as comment delimiters
+        special_snowflakes = ['pattern', 'bindingKey']
+        hash_ok = re.compile(r'([\w-]+)[ \t]*:[ \t]*([\S]+).*')
 
         def sub(line):
             """Do substitutions to make line json-friendly"""
             line = line.strip()
             if line.startswith("#"):
                 return ""
-            # 'pattern:' is a special snowflake.  It allows '#' characters in
-            # its value, so they cannot be treated as comment delimiters
-            if line.split(':')[0].strip().lower() == "pattern":
-                line = re.sub(pattern, r'"\1": "\2",', line)
+            if line.split(':')[0].strip() in special_snowflakes:
+                line = re.sub(hash_ok, r'"\1": "\2",', line)
             else:
                 line = line.split('#')[0].strip()
                 line = re.sub(begin, r'["\1", {', line)
@@ -174,6 +179,7 @@ def configure_dispatch(dispatch, lib_handle, filename):
     # Remaining configuration
     for t in "sslProfile", "authServicePlugin", "listener", "connector", \
              "router.config.address", "router.config.linkRoute", "router.config.autoLink", \
+             "router.config.exchange", "router.config.binding", \
              "policy", "vhost":
         for a in config.by_type(t):
             configure(a)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 627f6da..ac85bca 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -86,6 +86,7 @@ set(qpid_dispatch_SOURCES
   router_core/agent_router.c
   router_core/connections.c
   router_core/error.c
+  router_core/exchange_bindings.c
   router_core/forwarder.c
   router_core/route_control.c
   router_core/router_core.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 7564c0b..b9a6b88 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -222,6 +222,18 @@ qd_error_t qd_dispatch_configure_auto_link(qd_dispatch_t *qd, qd_entity_t *entit
     return qd_error_code();
 }
 
+qd_error_t qd_dispatch_configure_exchange(qd_dispatch_t *qd, qd_entity_t *entity) {
+    if (!qd->router) return qd_error(QD_ERROR_NOT_FOUND, "No router available");
+    qd_router_configure_exchange(qd->router, entity);
+    return qd_error_code();
+}
+
+qd_error_t qd_dispatch_configure_binding(qd_dispatch_t *qd, qd_entity_t *entity) {
+    if (!qd->router) return qd_error(QD_ERROR_NOT_FOUND, "No router available");
+    qd_router_configure_binding(qd->router, entity);
+    return qd_error_code();
+}
+
 qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity)
 {
     qd_error_t err;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index bed3ed8..ccad7c0 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -747,8 +747,12 @@ qd_iterator_t *qd_iterator_dup(const qd_iterator_t *iter)
         return 0;
 
     qd_iterator_t *dup = new_qd_iterator_t();
-    if (dup)
+    if (dup) {
         *dup = *iter;
+        // drop any references to the hash segments to avoid potential double
+        // free
+        DEQ_INIT(dup->hash_segments);
+    }
     return dup;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/parse_tree.c
----------------------------------------------------------------------
diff --git a/src/parse_tree.c b/src/parse_tree.c
index 31be640..256b838 100644
--- a/src/parse_tree.c
+++ b/src/parse_tree.c
@@ -695,15 +695,15 @@ bool qd_parse_tree_walk(qd_parse_tree_t *node, qd_parse_tree_visit_t *callback,
 }
 
 
-bool qd_parse_tree_validate_pattern(qd_parse_tree_type_t type,
+bool qd_parse_tree_validate_pattern(const qd_parse_tree_t *tree,
                                     const qd_iterator_t *pattern)
 {
-    switch (type) {
+    switch (tree->type) {
     case QD_PARSE_TREE_MQTT: {
         // simply ensure that if a '#' is present it is the last token in the
         // pattern
         token_iterator_t ti;
-        bool valid = false;
+        bool valid = true;
         qd_iterator_t *dup = qd_iterator_dup(pattern);
         char *str = (char *)qd_iterator_copy(dup);
         qd_iterator_free(dup);
@@ -729,6 +729,12 @@ bool qd_parse_tree_validate_pattern(qd_parse_tree_type_t type,
 }
 
 
+qd_parse_tree_type_t qd_parse_tree_type(const qd_parse_tree_t *tree)
+{
+    return tree->type;
+}
+
+
 #if 0
 #include <stdio.h>
 void qd_parse_tree_dump(qd_parse_node_t *node, int depth)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/parse_tree.h
----------------------------------------------------------------------
diff --git a/src/parse_tree.h b/src/parse_tree.h
index 9b7126e..1b586ce 100644
--- a/src/parse_tree.h
+++ b/src/parse_tree.h
@@ -53,9 +53,10 @@ typedef enum {
 
 qd_parse_tree_t *qd_parse_tree_new(qd_parse_tree_type_t type);
 void qd_parse_tree_free(qd_parse_tree_t *tree);
+qd_parse_tree_type_t qd_parse_tree_type(const qd_parse_tree_t *tree);
 
 // verify the pattern is in a legal format for the given tree's match algorithm
-bool qd_parse_tree_validate_pattern(qd_parse_tree_type_t type,
+bool qd_parse_tree_validate_pattern(const qd_parse_tree_t *tree,
                                     const qd_iterator_t *pattern);
 
 // returns old payload or NULL if new

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index da5bb97..94758df 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -301,6 +301,113 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit
 }
 
 
+qd_error_t qd_router_configure_exchange(qd_router_t *router, qd_entity_t *entity)
+{
+
+    char *name      = 0;
+    char *address   = 0;
+    char *alternate = 0;
+    char *method    = 0;
+
+    do {
+        long phase   = qd_entity_opt_long(entity, "phase", 0);              QD_ERROR_BREAK();
+        long alt_phase = qd_entity_opt_long(entity, "alternatePhase", 0);   QD_ERROR_BREAK();
+        name         = qd_entity_get_string(entity, "name");                QD_ERROR_BREAK();
+        address      = qd_entity_get_string(entity, "address");             QD_ERROR_BREAK();
+        alternate    = qd_entity_opt_string(entity, "alternateAddress", 0); QD_ERROR_BREAK();
+        method       = qd_entity_opt_string(entity, "matchMethod", 0);      QD_ERROR_BREAK();
+
+        qd_composed_field_t *body = qd_compose_subfield(0);
+        qd_compose_start_map(body);
+
+        qd_compose_insert_string(body, "name");
+        qd_compose_insert_string(body, name);
+
+        qd_compose_insert_string(body, "address");
+        qd_compose_insert_string(body, address);
+
+        qd_compose_insert_string(body, "phase");
+        qd_compose_insert_int(body, phase);
+
+        if (alternate) {
+            qd_compose_insert_string(body, "alternateAddress");
+            qd_compose_insert_string(body, alternate);
+            qd_compose_insert_string(body, "alternatePhase");
+            qd_compose_insert_int(body, alt_phase);
+        }
+
+        qd_compose_insert_string(body, "matchMethod");
+        if (method)
+            qd_compose_insert_string(body, method);
+        else
+            qd_compose_insert_string(body, "amqp");
+
+        qd_compose_end_map(body);
+
+        qdi_router_configure_body(router->router_core, body, QD_ROUTER_EXCHANGE, name);
+        qd_compose_free(body);
+    } while(0);
+
+    free(name);
+    free(address);
+    free(alternate);
+    free(method);
+
+    return qd_error_code();
+}
+
+
+qd_error_t qd_router_configure_binding(qd_router_t *router, qd_entity_t *entity)
+{
+    char *name     = 0;
+    char *exchange = 0;
+    char *key      = 0;
+    char *next_hop = 0;
+
+    do {
+        long phase = qd_entity_opt_long(entity, "nextHopPhase", 0);   QD_ERROR_BREAK();
+        name       = qd_entity_opt_string(entity, "name", 0);         QD_ERROR_BREAK();
+        exchange   = qd_entity_get_string(entity, "exchangeName");    QD_ERROR_BREAK();
+        key        = qd_entity_opt_string(entity, "bindingKey", 0);   QD_ERROR_BREAK();
+        next_hop   = qd_entity_get_string(entity, "nextHopAddress");  QD_ERROR_BREAK();
+
+        qd_composed_field_t *body = qd_compose_subfield(0);
+        qd_compose_start_map(body);
+
+        if (name) {
+            qd_compose_insert_string(body, "name");
+            qd_compose_insert_string(body, name);
+        }
+
+        qd_compose_insert_string(body, "exchangeName");
+        qd_compose_insert_string(body, exchange);
+
+        if (key) {
+            qd_compose_insert_string(body, "bindingKey");
+            qd_compose_insert_string(body, key);
+        }
+
+        qd_compose_insert_string(body, "nextHopAddress");
+        qd_compose_insert_string(body, next_hop);
+
+        qd_compose_insert_string(body, "nextHopPhase");
+        qd_compose_insert_int(body, phase);
+
+        qd_compose_end_map(body);
+
+        qdi_router_configure_body(router->router_core, body, QD_ROUTER_BINDING, name);
+        qd_compose_free(body);
+    } while(0);
+
+    free(name);
+    free(exchange);
+    free(key);
+    free(next_hop);
+
+    return qd_error_code();
+}
+
+
 void qd_router_configure_free(qd_router_t *router)
 {
     if (!router) return;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index e53beae..c4dc50d 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -26,6 +26,7 @@
 #include "agent_router.h"
 #include "agent_connection.h"
 #include "router_core_private.h"
+#include "exchange_bindings.h"
 #include <stdio.h>
 
 static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -194,8 +195,8 @@ qdr_query_t *qdr_manage_query(qdr_core_t              *core,
     case QD_ROUTER_LINK:              qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT);  break;
     case QD_ROUTER_ADDRESS:           qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
     case QD_ROUTER_FORBIDDEN:         break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_EXCHANGE:          qdr_agent_set_columns(query, attribute_names, qdr_config_exchange_columns, QDR_CONFIG_EXCHANGE_COLUMN_COUNT); break;
+    case QD_ROUTER_BINDING:           qdr_agent_set_columns(query, attribute_names, qdr_config_binding_columns, QDR_CONFIG_BINDING_COLUMN_COUNT); break;
     }
 
     return query;
@@ -213,8 +214,8 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
     case QD_ROUTER_LINK:              qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break;
     case QD_ROUTER_ADDRESS:           qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break;
     case QD_ROUTER_FORBIDDEN:         qd_compose_empty_list(query->body); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_EXCHANGE:          qdr_agent_emit_columns(query, qdr_config_exchange_columns, QDR_CONFIG_EXCHANGE_COLUMN_COUNT); break;
+    case QD_ROUTER_BINDING:           qdr_agent_emit_columns(query, qdr_config_binding_columns, QDR_CONFIG_BINDING_COLUMN_COUNT); break;
     }
 }
 
@@ -354,8 +355,8 @@ static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool disc
     case QD_ROUTER_LINK:              break;
     case QD_ROUTER_ADDRESS:           qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break;
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_CT(core, name, identity, query, qdr_config_exchange_columns); break;
+    case QD_ROUTER_BINDING:           qdra_config_binding_get_CT(core, name, identity, query, qdr_config_binding_columns); break;
    }
 
     qdr_field_free(action->args.agent.name);
@@ -379,8 +380,8 @@ static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool di
     case QD_ROUTER_LINK:              break;
     case QD_ROUTER_ADDRESS:           break;
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_EXCHANGE:          qdra_config_exchange_create_CT(core, name, query, in_body); break;
+    case QD_ROUTER_BINDING:           qdra_config_binding_create_CT(core, name, query, in_body); break;
 
    }
 
@@ -405,8 +406,8 @@ static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool di
     case QD_ROUTER_LINK:              break;
     case QD_ROUTER_ADDRESS:           break;
     case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, false); break;
-    case QD_ROUTER_EXCHANGE:          break;
-    case QD_ROUTER_BINDING:           break;
+    case QD_ROUTER_EXCHANGE:          qdra_config_exchange_delete_CT(core, query, name, identity); break;
+    case QD_ROUTER_BINDING:           qdra_config_binding_delete_CT(core, query, name, identity); break;
    }
 
    qdr_field_free(action->args.agent.name);
@@ -456,8 +457,8 @@ static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool
         case QD_ROUTER_LINK:              qdra_link_get_first_CT(core, query, offset); break;
         case QD_ROUTER_ADDRESS:           qdra_address_get_first_CT(core, query, offset); break;
         case QD_ROUTER_FORBIDDEN:         qdr_agent_forbidden(core, query, true); break;
-        case QD_ROUTER_EXCHANGE:          break;
-        case QD_ROUTER_BINDING:           break;
+        case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_BINDING:           qdra_config_binding_get_first_CT(core, query, offset); break;
         }
     }
 }
@@ -477,8 +478,8 @@ static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool
         case QD_ROUTER_LINK:              qdra_link_get_next_CT(core, query); break;
         case QD_ROUTER_ADDRESS:           qdra_address_get_next_CT(core, query); break;
         case QD_ROUTER_FORBIDDEN:         break;
-        case QD_ROUTER_EXCHANGE:          break;
-        case QD_ROUTER_BINDING:           break;
+        case QD_ROUTER_EXCHANGE:          qdra_config_exchange_get_next_CT(core, query); break;
+        case QD_ROUTER_BINDING:           qdra_config_binding_get_next_CT(core, query); break;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 7add23a..4598312 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1460,12 +1460,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     qdr_link_outbound_second_attach_CT(core, link, source, target);
 
                     //
-                    // Issue the initial credit only if there are destinations for the address or if the address treatment is multicast.
+                    // Issue the initial credit only if one of the following
+                    // holds:
+                    // - there are destinations for the address
+                    // - if the address treatment is multicast
+                    // - the address is that of an exchange (no subscribers allowed)
                     //
                     if (DEQ_SIZE(addr->subscriptions)
                             || DEQ_SIZE(addr->rlinks)
                             || qd_bitmask_cardinality(addr->rnodes)
-                            || qdr_is_addr_treatment_multicast(addr)) {
+                            || qdr_is_addr_treatment_multicast(addr)
+                            || !!addr->exchange) {
                         qdr_link_issue_credit_CT(core, link, link->capacity, false);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/exchange_bindings.c
----------------------------------------------------------------------
diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c
new file mode 100644
index 0000000..02bf192
--- /dev/null
+++ b/src/router_core/exchange_bindings.c
@@ -0,0 +1,1332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <qpid/dispatch/ctools.h>
+#include <inttypes.h>
+#include <stdio.h>
+#include "router_core_private.h"
+#include "forwarder.h"
+#include "exchange_bindings.h"
+
+
+// next_hop_t
+// Describes the destination of a forwarded message
+// May be shared by different bindings
+//
+typedef struct next_hop_t next_hop_t;
+struct next_hop_t
+{
+    // per-exchange list of all next hops
+    DEQ_LINKS_N(exchange_list, next_hop_t);
+    // when hooked to the transmit list
+    DEQ_LINKS_N(transmit_list, next_hop_t);
+
+    int                  ref_count;  // binding references
+    int                  phase;
+    bool                 on_xmit_list;
+    qdr_exchange_t      *exchange;
+    unsigned char       *next_hop;
+    qdr_address_t       *qdr_addr;
+};
+
+ALLOC_DECLARE(next_hop_t);
+ALLOC_DEFINE(next_hop_t);
+DEQ_DECLARE(next_hop_t, next_hop_list_t);
+
+// qdr_binding_t
+// Represents a subject key --> next hop mapping
+// A binding is uniquely identified by the tuple (pattern, nextHop, phase).  No
+// two bindings with the same tuple value can exist on an exchange.
+// The binding is implemented using two classes: qdr_binding_t and
+// next_hop_t. The qdr_binding_t holds the pattern and points to the
+// next_hop_t.  This allows different patterns to share the same nextHop.
+// Since there is only one next_hop_t instance for each (nextHop, phase) value,
+// we guarantee only 1 copy of a message is forwarded to a given nextHop+phase
+// even if multiple distinct patterns are matched. Ex: a message with a
+// value of "a.b" will match two distict binding keys "+.b" and "a.+".  If
+// both these patterns share the same next_hop_t only 1 copy of the message
+// will be forwarded.
+typedef struct qdr_binding qdr_binding_t;
+struct qdr_binding
+{
+    // per-exchange list of all bindings
+    DEQ_LINKS_N(exchange_list, qdr_binding_t);
+    // parse tree node's list of bindings sharing the same pattern
+    DEQ_LINKS_N(tree_list, qdr_binding_t);
+
+    unsigned char       *name;
+    uint64_t             identity;
+    qdr_exchange_t      *exchange;
+
+    unsigned char       *key;
+    next_hop_t          *next_hop;
+
+    uint64_t             msgs_matched;
+};
+
+ALLOC_DECLARE(qdr_binding_t);
+ALLOC_DEFINE(qdr_binding_t);
+DEQ_DECLARE(qdr_binding_t, qdr_binding_list_t);
+
+
+struct qdr_exchange {
+    DEQ_LINKS(qdr_exchange_t);          // for core->exchanges
+    qdr_core_t         *core;
+    uint64_t            identity;
+    unsigned char      *name;
+    unsigned char      *address;
+    int                 phase;
+    qd_parse_tree_t    *parse_tree;
+    qdr_address_t      *qdr_addr;
+    next_hop_t         *alternate;
+    qdr_binding_list_t  bindings;
+    next_hop_list_t     next_hops;
+    qdr_forwarder_t    *old_forwarder;
+
+    uint64_t msgs_received;
+    uint64_t msgs_dropped;
+    uint64_t msgs_routed;
+    uint64_t msgs_alternate;
+};
+
+ALLOC_DECLARE(qdr_exchange_t);
+ALLOC_DEFINE(qdr_exchange_t);
+
+static void qdr_exchange_free(qdr_exchange_t *ex);
+static qdr_exchange_t *qdr_exchange(qdr_core_t    *core,
+                                    qd_iterator_t *name,
+                                    qd_iterator_t *address,
+                                    int            phase,
+                                    qd_iterator_t *alternate,
+                                    int            alt_phase,
+                                    qd_parse_tree_type_t method);
+static void write_config_exchange_map(qdr_exchange_t      *ex,
+                                      qd_composed_field_t *body);
+static qdr_exchange_t *find_exchange(qdr_core_t    *core,
+                                     qd_iterator_t *identity,
+                                     qd_iterator_t *name);
+static qdr_binding_t *find_binding(qdr_core_t *core,
+                                   qd_iterator_t  *identity,
+                                   qd_iterator_t  *name);
+static void write_config_exchange_list(qdr_exchange_t *ex,
+                                       qdr_query_t    *query);
+static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
+                                  qd_iterator_t  *name,
+                                  qd_iterator_t  *key,
+                                  qd_iterator_t  *next_hop,
+                                  int             phase);
+static void write_config_binding_map(qdr_binding_t       *binding,
+                                     qd_composed_field_t *body);
+static qdr_binding_t *find_binding(qdr_core_t    *core,
+                                   qd_iterator_t *identity,
+                                   qd_iterator_t *name);
+static void qdr_binding_free(qdr_binding_t *b);
+static void write_config_binding_list(qdr_binding_t *binding,
+                                      qdr_query_t   *query);
+static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
+                                           int         index);
+static next_hop_t *next_hop(qdr_exchange_t *ex,
+                            qd_iterator_t  *address,
+                            int             phase);
+static void next_hop_release(next_hop_t *next_hop);
+static next_hop_t *find_next_hop(qdr_exchange_t *ex,
+                                 qd_iterator_t  *address,
+                                 int             phase);
+static bool gather_next_hops(void *handle,
+                             const char *pattern,
+                             void *payload);
+static int send_message(qdr_core_t     *core,
+                        next_hop_t     *next_hop,
+                        qd_message_t   *msg,
+                        qdr_delivery_t *in_delivery,
+                        bool            exclude_inprocess,
+                        bool            control);
+
+
+//
+// The Exchange Forwarder
+//
+int qdr_forward_exchange_CT(qdr_core_t     *core,
+                            qdr_address_t  *addr,
+                            qd_message_t   *msg,
+                            qdr_delivery_t *in_delivery,
+                            bool            exclude_inprocess,
+                            bool            control)
+{
+    int forwarded = 0;
+    const bool presettled = !!in_delivery ? in_delivery->settled : true;
+    qdr_exchange_t *ex = addr->exchange;
+    assert(ex);
+
+    ex->msgs_received += 1;
+
+    // honor the disposition for the exchange address (this may not be right??)
+    if (ex->old_forwarder)
+        forwarded = ex->old_forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
+
+    // @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
+    // forwarder)
+    //
+    // If the delivery is not presettled, set the settled flag for forwarding so all
+    // outgoing deliveries will be presettled.
+    //
+    // NOTE:  This is the only multicast mode currently supported.  Others will likely be
+    //        implemented in the future.
+    //
+    if (!presettled) {
+        in_delivery->settled = true;
+        //
+        // If the router is configured to reject unsettled multicasts, settle and reject this delivery.
+        //
+        if (!core->qd->allow_unsettled_multicast) {
+            in_delivery->disposition = PN_REJECTED;
+            in_delivery->error = qdr_error("qd:forbidden", "Deliveries to an exchange must be pre-settled");
+            qdr_delivery_push_CT(core, in_delivery);
+            return 0;
+        }
+    }
+
+    qd_iterator_t *subject = qd_message_check(msg, QD_DEPTH_PROPERTIES)
+        ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
+        : NULL;
+    next_hop_list_t transmit_list;
+    DEQ_INIT(transmit_list);
+
+    if (subject) {
+        // find all matching bindings and build a list of their next hops
+        qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops, &transmit_list);
+        qd_iterator_free(subject);
+    }
+
+    // if there are valid next hops then we're routing this message based on an
+    // entirely new destination address.  We need to reset the origin and the
+    // excluded link flags in the delivery.  We also need to reset the trace
+    // annotations and ingress field in the message. This is done because it is
+    // possible that the next hop is reached via the same link/router this
+    // message arrived from.
+    // @TODO(kgiusti) - loop detection
+    if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
+        if (in_delivery) {
+            in_delivery->origin = 0;
+            qd_bitmask_free(in_delivery->link_exclusion);
+            in_delivery->link_exclusion = 0;
+        }
+
+        const char *node_id = qd_router_id(core->qd);
+        qd_composed_field_t *trace_field = qd_compose_subfield(0);
+        qd_compose_start_list(trace_field);
+        qd_compose_insert_string(trace_field, node_id);
+        qd_compose_end_list(trace_field);
+        qd_message_set_trace_annotation(msg, trace_field);
+
+        qd_composed_field_t *ingress_field = qd_compose_subfield(0);
+        qd_compose_insert_string(ingress_field, node_id);
+        qd_message_set_ingress_annotation(msg, ingress_field);
+    }
+
+    next_hop_t *next_hop = DEQ_HEAD(transmit_list);
+    while (next_hop) {
+        DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
+        next_hop->on_xmit_list = false;
+        assert(next_hop->qdr_addr);
+        // @TODO(kgiusti) - non-recursive handling of next hop if it is an exchange
+        forwarded += send_message(ex->core, next_hop, msg, in_delivery, exclude_inprocess, control);
+        next_hop = DEQ_HEAD(transmit_list);
+    }
+
+    if (forwarded == 0 && ex->alternate) {
+        forwarded = send_message(ex->core, ex->alternate, msg, in_delivery, exclude_inprocess, control);
+        if (forwarded) {
+            ex->msgs_alternate += 1;
+        }
+    }
+
+    // @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
+    // multicast forwarder)
+    if (forwarded == 0) {
+        ex->msgs_dropped += 1;
+        if (!presettled) {
+            //
+            // The delivery was not originally presettled and it was not
+            // forwarded to any destinations, return it to its original
+            // unsettled state.
+            //
+            in_delivery->settled = false;
+        }
+    } else {
+        ex->msgs_routed += 1;
+        if (in_delivery && !presettled) {
+            //
+            // The delivery was not presettled and it was forwarded to at least
+            // one destination.  Accept and settle the delivery only if the
+            // entire delivery has been received.
+            //
+            const bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
+            if (receive_complete) {
+                in_delivery->disposition = PN_ACCEPTED;
+                qdr_delivery_push_CT(core, in_delivery);
+            }
+        }
+    }
+
+    return forwarded;
+}
+
+
+// callback from parse tree search:
+// handle = transmit_list containing all matching next_hops
+// pattern = pattern that matches the search (ignored)
+// payload = list of bindings configured for the pattern
+static bool gather_next_hops(void *handle, const char *pattern, void *payload)
+{
+    next_hop_list_t *transmit_list = (next_hop_list_t *)handle;
+    qdr_binding_list_t *bindings = (qdr_binding_list_t *)payload;
+
+    qdr_binding_t *binding = DEQ_HEAD(*bindings);
+    while (binding) {
+        binding->msgs_matched += 1;
+        // note - since multiple bindings may reference the next hop, it is
+        // possible a next hop has already been added to the transmit list.
+        // do not re-add.  This is not thread safe but that is fine since all
+        // forwarding is done on the core thread.
+        if (!binding->next_hop->on_xmit_list) {
+            DEQ_INSERT_TAIL_N(transmit_list, *transmit_list, binding->next_hop);
+            binding->next_hop->on_xmit_list = true;
+        }
+        binding = DEQ_NEXT_N(tree_list, binding);
+    }
+    return true;  // keep searching
+}
+
+
+// Forward a copy of the message to the to_addr address
+static int send_message(qdr_core_t     *core,
+                        next_hop_t     *next_hop,
+                        qd_message_t   *msg,
+                        qdr_delivery_t *in_delivery,
+                        bool            exclude_inprocess,
+                        bool            control)
+{
+    int count = 0;
+    qd_message_t *copy = qd_message_copy(msg);
+
+    qd_log(core->log, QD_LOG_TRACE, "Exchange '%s' forwarding message to '%s'",
+           next_hop->exchange->name, next_hop->next_hop);
+
+    // set "to override" and "phase" message annotations based on the next hop
+    qd_composed_field_t *to_field = qd_compose_subfield(0);
+    qd_compose_insert_string(to_field, (char *)next_hop->next_hop);
+    qd_message_set_to_override_annotation(copy, to_field);  // frees to_field
+    qd_message_set_phase_annotation(copy, next_hop->phase);
+
+    count = qdr_forward_message_CT(core, next_hop->qdr_addr, copy, in_delivery, exclude_inprocess, control);
+    qd_message_free(copy);
+
+    return count;
+}
+
+
+long qdr_exchange_binding_count(const qdr_exchange_t *ex)
+{
+    return (long) DEQ_SIZE(ex->bindings);
+}
+
+
+qdr_address_t *qdr_exchange_alternate_addr(const qdr_exchange_t *ex)
+{
+    return (ex->alternate) ? ex->alternate->qdr_addr : NULL;
+}
+
+
+/////////////////////////////
+// Exchange Management API //
+/////////////////////////////
+
+#define QDR_CONFIG_EXCHANGE_NAME          0
+#define QDR_CONFIG_EXCHANGE_IDENTITY      1
+#define QDR_CONFIG_EXCHANGE_ADDRESS       2
+#define QDR_CONFIG_EXCHANGE_PHASE         3
+#define QDR_CONFIG_EXCHANGE_ALTERNATE     4
+#define QDR_CONFIG_EXCHANGE_ALT_PHASE     5
+#define QDR_CONFIG_EXCHANGE_MATCH_METHOD  6
+#define QDR_CONFIG_EXCHANGE_BINDING_COUNT 7
+#define QDR_CONFIG_EXCHANGE_RECEIVED      8
+#define QDR_CONFIG_EXCHANGE_DROPPED       9
+#define QDR_CONFIG_EXCHANGE_FORWARDED     10
+#define QDR_CONFIG_EXCHANGE_DIVERTED      11
+
+const char *qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_COLUMN_COUNT + 1] =
+    {"name",
+     "identity",
+     "address",
+     "phase",
+     "alternateAddress",
+     "alternatePhase",
+     "matchMethod",
+     "bindingCount",
+     "receivedCount",
+     "droppedCount",
+     "forwardedCount",
+     "divertedCount",
+     0};
+
+// from management_agent.c
+extern const unsigned char *config_exchange_entity_type;
+
+#define QDR_CONFIG_BINDING_NAME         0
+#define QDR_CONFIG_BINDING_IDENTITY     1
+#define QDR_CONFIG_BINDING_EXCHANGE     2
+#define QDR_CONFIG_BINDING_KEY          3
+#define QDR_CONFIG_BINDING_NEXTHOP      4
+#define QDR_CONFIG_BINDING_NHOP_PHASE   5
+#define QDR_CONFIG_BINDING_MATCHED      6
+
+const char *qdr_config_binding_columns[QDR_CONFIG_BINDING_COLUMN_COUNT + 1] =
+    {"name",
+     "identity",
+     "exchangeName",
+     "bindingKey",
+     "nextHopAddress",
+     "nextHopPhase",
+     "matchedCount",
+     0};
+
+// from management_agent.c
+extern const unsigned char *config_binding_entity_type;
+
+
+// called on core shutdown to release all exchanges
+//
+void qdr_exchange_free_all(qdr_core_t *core)
+{
+    qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
+    while (ex) {
+        qdr_exchange_t *next = DEQ_NEXT(ex);
+        qdr_exchange_free(ex);
+        ex = next;
+    }
+}
+
+
+// Exchange CREATE
+//
+//
+void qdra_config_exchange_create_CT(qdr_core_t         *core,
+                                    qd_iterator_t      *name,
+                                    qdr_query_t        *query,
+                                    qd_parsed_field_t  *in_body)
+{
+    qdr_exchange_t *ex = NULL;
+
+    query->status = QD_AMQP_BAD_REQUEST;
+
+    if (!qd_parse_is_map(in_body)) {
+        query->status.description = "Body of request must be a map";
+        goto exit;
+    }
+
+    if (!name) {
+        query->status.description = "exchange requires a unique name";
+        goto exit;
+    }
+
+    qd_parsed_field_t *address_field = qd_parse_value_by_key(in_body,
+                                                             qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ADDRESS]);
+    if (!address_field) {
+        query->status.description = "exchange address is mandatory";
+        goto exit;
+    }
+    qd_iterator_t *address = qd_parse_raw(address_field);
+
+    // check for duplicates
+    {
+        qdr_exchange_t *eptr = 0;
+        for (eptr = DEQ_HEAD(core->exchanges); eptr; eptr = DEQ_NEXT(eptr)) {
+            if (qd_iterator_equal(address, eptr->address)) {
+                query->status.description = "duplicate exchange address";
+                goto exit;
+            } else if (qd_iterator_equal(name, eptr->name)) {
+                query->status.description = "duplicate exchange name";
+                goto exit;
+            }
+        }
+    }
+
+    qd_parsed_field_t *method_field = qd_parse_value_by_key(in_body,
+                                                            qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_MATCH_METHOD]);
+    qd_parse_tree_type_t method = QD_PARSE_TREE_AMQP_0_10;
+    if (method_field) {
+        if (qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"mqtt")) {
+            method = QD_PARSE_TREE_MQTT;
+        } else if (!qd_iterator_equal(qd_parse_raw(method_field), (const unsigned char *)"amqp")) {
+            query->status.description = "Exchange matchMethod must be either 'amqp' or 'mqtt'";
+            goto exit;
+        }
+    }
+
+    int phase = 0;
+    qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
+                                                           qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_PHASE]);
+    if (phase_field) {
+        phase = qd_parse_as_int(phase_field);
+        if (phase < 0 || phase > 9) {
+            query->status.description = "phase must be in the range 0-9";
+            goto exit;
+        }
+    }
+
+    qd_iterator_t *alternate = NULL;
+    int alt_phase = 0;
+    qd_parsed_field_t *alternate_field = qd_parse_value_by_key(in_body,
+                                                               qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALTERNATE]);
+    if (alternate_field) {
+        alternate = qd_parse_raw(alternate_field);
+        qd_parsed_field_t *alt_phase_field = qd_parse_value_by_key(in_body,
+                                                                   qdr_config_exchange_columns[QDR_CONFIG_EXCHANGE_ALT_PHASE]);
+        if (alt_phase_field) {
+            alt_phase = qd_parse_as_int(alt_phase_field);
+            if (alt_phase < 0 || alt_phase > 9) {
+                query->status.description = "phase must be in the range 0-9";
+                goto exit;
+            }
+        }
+    }
+
+    ex = qdr_exchange(core, name, address, phase, alternate, alt_phase, method);
+    if (ex) {
+        // @TODO(kgiusti) - for now, until the behavior is nailed down:
+        static int warn_user;
+        if (!warn_user) {
+            warn_user = 1;
+            qd_log(core->agent_log, QD_LOG_WARNING,
+                   "The Exchange/Binding feature is currently EXPERIMENTAL."
+                   " Its functionality may change in future releases"
+                   " of the Qpid Dispatch Router. Backward compatibility is"
+                   " not guaranteed.");
+        }
+        query->status = QD_AMQP_CREATED;
+        if (query->body) {
+            write_config_exchange_map(ex, query->body);
+        }
+    } else {
+        query->status.description = "failed to allocate exchange";
+    }
+
+ exit:
+
+    if (query->status.status == QD_AMQP_CREATED.status) {
+        qd_log(core->agent_log, QD_LOG_DEBUG,
+               "Exchange %s CREATED (id=%"PRIu64")", ex->name, ex->identity);
+
+    } else {
+        qd_log(core->agent_log, QD_LOG_ERROR,
+               "Error performing CREATE of %s: %s", config_exchange_entity_type, query->status.description);
+        // return a NULL body:
+        if (query->body) qd_compose_insert_null(query->body);
+    }
+
+    if (query->body) {
+        qdr_agent_enqueue_response_CT(core, query);
+    } else {
+        // no body == create from internal config parser
+        qdr_query_free(query);
+    }
+}
+
+
+// Exchange DELETE:
+//
+void qdra_config_exchange_delete_CT(qdr_core_t    *core,
+                                    qdr_query_t   *query,
+                                    qd_iterator_t *name,
+                                    qd_iterator_t *identity)
+{
+    qdr_exchange_t *ex = 0;
+
+    if (!name && !identity) {
+        query->status = QD_AMQP_BAD_REQUEST;
+        query->status.description = "No name or identity provided";
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s",
+               config_exchange_entity_type, query->status.description);
+    } else {
+        ex = find_exchange(core, identity, name);
+        if (ex) {
+            qd_log(core->agent_log, QD_LOG_DEBUG,
+                   "Exchange %s DELETED (id=%"PRIu64")", ex->name, ex->identity);
+            qdr_exchange_free(ex);
+            query->status = QD_AMQP_NO_CONTENT;
+        } else
+            query->status = QD_AMQP_NOT_FOUND;
+    }
+
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+// Exchange GET
+//
+void qdra_config_exchange_get_CT(qdr_core_t    *core,
+                                 qd_iterator_t *name,
+                                 qd_iterator_t *identity,
+                                 qdr_query_t   *query,
+                                 const char    *columns[])
+{
+    qdr_exchange_t *ex = 0;
+
+    if (!name && !identity) {
+        query->status = QD_AMQP_BAD_REQUEST;
+        query->status.description = "No name or identity provided";
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s",
+               config_exchange_entity_type, query->status.description);
+    }
+    else {
+        ex = find_exchange(core, identity, name);
+        if (!ex) {
+            query->status = QD_AMQP_NOT_FOUND;
+        }
+        else {
+            if (query->body) write_config_exchange_map(ex, query->body);
+            query->status = QD_AMQP_OK;
+        }
+    }
+
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Exchange GET first:
+//
+void qdra_config_exchange_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+    //
+    // Queries that get this far will always succeed.
+    //
+    query->status = QD_AMQP_OK;
+
+    //
+    // If the offset goes beyond the set of objects, end the query now.
+    //
+    if (offset >= DEQ_SIZE(core->exchanges)) {
+        query->more = false;
+        qdr_agent_enqueue_response_CT(core, query);
+        return;
+    }
+
+    //
+    // Run to the object at the offset.
+    //
+    qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
+    for (int i = 0; i < offset; i++)
+        ex = DEQ_NEXT(ex);
+    assert(ex);
+
+    //
+    // Write the columns of the object into the response body.
+    //
+    if (query->body) write_config_exchange_list(ex, query);
+
+    //
+    // Advance to the next address
+    //
+    query->next_offset = offset + 1;
+    query->more = !!DEQ_NEXT(ex);
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+// Exchange GET-NEXT
+//
+void qdra_config_exchange_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    qdr_exchange_t *ex = 0;
+
+    if (query->next_offset < DEQ_SIZE(core->exchanges)) {
+        ex = DEQ_HEAD(core->exchanges);
+        for (int i = 0; i < query->next_offset && ex; i++)
+            ex = DEQ_NEXT(ex);
+    }
+
+    if (ex) {
+        //
+        // Write the columns of the addr entity into the response body.
+        //
+        if (query->body) write_config_exchange_list(ex, query);
+
+        //
+        // Advance to the next object
+        //
+        query->next_offset++;
+        query->more = !!DEQ_NEXT(ex);
+    } else
+        query->more = false;
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding CREATE
+void qdra_config_binding_create_CT(qdr_core_t         *core,
+                                   qd_iterator_t      *name,
+                                   qdr_query_t        *query,
+                                   qd_parsed_field_t  *in_body)
+{
+    qdr_binding_t *binding = NULL;
+    qdr_exchange_t *ex = NULL;
+    qd_iterator_t *key = NULL;
+
+    query->status = QD_AMQP_BAD_REQUEST;
+
+    if (!qd_parse_is_map(in_body)) {
+        query->status.description = "Body of request must be a map";
+        goto exit;
+    }
+
+    qd_parsed_field_t *exchange_field = qd_parse_value_by_key(in_body,
+                                                              qdr_config_binding_columns[QDR_CONFIG_BINDING_EXCHANGE]);
+    if (!exchange_field) {
+        query->status.description = "Binding configuration requires an exchange";
+        goto exit;
+    }
+
+    // lookup the exchange by its name:
+    ex = find_exchange(core, NULL, qd_parse_raw(exchange_field));
+    if (!ex) {
+        query->status.description = "Named exchange does not exist";
+        goto exit;
+    }
+
+    qd_parsed_field_t *next_hop_field = qd_parse_value_by_key(in_body,
+                                                              qdr_config_binding_columns[QDR_CONFIG_BINDING_NEXTHOP]);
+    if (!next_hop_field) {
+        query->status.description = "No next hop specified";
+        goto exit;
+    }
+    qd_iterator_t *nhop = qd_parse_raw(next_hop_field);
+
+    qd_parsed_field_t *key_field = qd_parse_value_by_key(in_body,
+                                                         qdr_config_binding_columns[QDR_CONFIG_BINDING_KEY]);
+    // if no pattern given, assume match all "#":
+    key = key_field ? qd_iterator_dup(qd_parse_raw(key_field)) : qd_iterator_string("#", ITER_VIEW_ALL);
+
+    if (!qd_parse_tree_validate_pattern(ex->parse_tree, key)) {
+        query->status.description = "The binding key pattern is invalid";
+        goto exit;
+    }
+
+    qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body,
+                                                         qdr_config_binding_columns[QDR_CONFIG_BINDING_NHOP_PHASE]);
+    int phase = (phase_field ? qd_parse_as_int(phase_field) : 0);
+    if (phase < 0 || phase > 9) {
+        query->status.description = "phase must be in the range 0-9";
+        goto exit;
+    }
+
+    // check for duplicates: the name and the tuple (key, next hop, phase) must
+    // be unique per exchange
+
+    for (qdr_binding_t *b = DEQ_HEAD(ex->bindings); b; b = DEQ_NEXT_N(exchange_list, b)) {
+        if (name && b->name && qd_iterator_equal(name, b->name)) {
+            query->status.description = "Duplicate next hop name";
+            goto exit;
+        } else if (qd_iterator_equal(key, b->key) &&
+                   qd_iterator_equal(nhop, b->next_hop->next_hop) &&
+                   phase == b->next_hop->phase) {
+            query->status.description = "Next hop for key already exists";
+            goto exit;
+        }
+    }
+
+    binding = qdr_binding(ex, name, key, nhop, phase);
+    if (binding) {
+        query->status = QD_AMQP_CREATED;
+        if (query->body) {
+            write_config_binding_map(binding, query->body);
+        }
+    } else {
+        query->status.description = "Failed to allocate next hop";
+    }
+
+
+ exit:
+
+    if (query->status.status == QD_AMQP_CREATED.status) {
+        qd_log(core->agent_log, QD_LOG_DEBUG,
+               "Exchange %s Binding %s -> %s CREATED (id=%"PRIu64")", ex->name,
+               binding->key, binding->next_hop->next_hop, binding->identity);
+    } else {
+        qd_log(core->agent_log, QD_LOG_ERROR,
+               "Error performing CREATE of %s: %s",
+               config_binding_entity_type,
+               query->status.description);
+        // return a NULL body:
+        if (query->body) qd_compose_insert_null(query->body);
+    }
+
+    if (query->body) {
+        qdr_agent_enqueue_response_CT(core, query);
+    } else {
+        // no body == create from internal config parser
+        qdr_query_free(query);
+    }
+
+    if (key) qd_iterator_free(key);
+}
+
+
+// Binding DELETE
+//
+void qdra_config_binding_delete_CT(qdr_core_t    *core,
+                                   qdr_query_t   *query,
+                                   qd_iterator_t *name,
+                                   qd_iterator_t *identity)
+{
+    if (!identity && !name) {
+        query->status = QD_AMQP_BAD_REQUEST;
+        query->status.description = "No binding name or identity provided";
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing DELETE of %s: %s",
+               config_binding_entity_type, query->status.description);
+    } else {
+        qdr_binding_t *binding = find_binding(core, identity, name);
+        if (!binding) {
+            query->status = QD_AMQP_NOT_FOUND;
+        } else {
+            qd_log(core->agent_log, QD_LOG_DEBUG,
+                   "Binding %s -> %s on exchange %s DELETED (id=%"PRIu64")",
+                   binding->key,
+                   binding->next_hop->next_hop,
+                   binding->exchange->name,
+                   binding->identity);
+            qdr_binding_free(binding);
+            query->status = QD_AMQP_NO_CONTENT;
+        }
+    }
+
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET
+//
+void qdra_config_binding_get_CT(qdr_core_t    *core,
+                                qd_iterator_t *name,
+                                qd_iterator_t *identity,
+                                qdr_query_t   *query,
+                                const char    *columns[])
+{
+    if (!identity && !name) {
+        query->status = QD_AMQP_BAD_REQUEST;
+        query->status.description = "No binding name or identity provided";
+        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s",
+               config_binding_entity_type, query->status.description);
+    } else {
+        qdr_binding_t *binding = find_binding(core, identity, name);
+        if (binding == 0) {
+            query->status = QD_AMQP_NOT_FOUND;
+        }
+        else {
+            if (query->body) write_config_binding_map(binding, query->body);
+            query->status = QD_AMQP_OK;
+        }
+    }
+
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET first
+//
+void qdra_config_binding_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+    query->status = QD_AMQP_OK;
+
+    qdr_binding_t *binding = get_binding_at_index(core, offset);
+    if (!binding) {
+        query->more = false;
+        qdr_agent_enqueue_response_CT(core, query);
+        return;
+    }
+
+    if (query->body) write_config_binding_list(binding, query);
+    query->next_offset = offset + 1;
+    query->more = !!(DEQ_NEXT_N(exchange_list, binding) || DEQ_NEXT(binding->exchange));
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Binding GET-NEXT
+//
+void qdra_config_binding_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    qdr_binding_t *binding = get_binding_at_index(core, query->next_offset);
+    if (binding) {
+        if (query->body) write_config_binding_list(binding, query);
+        query->next_offset++;
+        query->more = !!(DEQ_NEXT_N(exchange_list, binding) || DEQ_NEXT(binding->exchange));
+    } else
+        query->more = false;
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+// Exchange constructor/destructor
+static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
+                                    qd_iterator_t *name,
+                                    qd_iterator_t *address,
+                                    int            phase,
+                                    qd_iterator_t *alternate,
+                                    int            alt_phase,
+                                    qd_parse_tree_type_t method)
+{
+    assert(address);
+    qdr_exchange_t *ex = new_qdr_exchange_t();
+    if (ex) {
+        ZERO(ex);
+        DEQ_ITEM_INIT(ex);
+        ex->core = core;
+        ex->identity = qdr_identifier(core);
+        ex->name = qd_iterator_copy(name);
+        ex->address = qd_iterator_copy(address);
+        ex->phase = phase;
+        ex->parse_tree = qd_parse_tree_new(method);
+        DEQ_INIT(ex->bindings);
+        DEQ_INIT(ex->next_hops);
+
+        qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+        qd_iterator_annotate_phase(address, (char) phase + '0');
+        qd_hash_retrieve(core->addr_hash, address, (void **)&ex->qdr_addr);
+        if (!ex->qdr_addr) {
+            ex->qdr_addr = qdr_address_CT(core, qdr_treatment_for_address_hash_CT(core,
+                                                                                  address));
+            qd_hash_insert(core->addr_hash, address, ex->qdr_addr, &ex->qdr_addr->hash_handle);
+            DEQ_INSERT_TAIL(core->addrs, ex->qdr_addr);
+        }
+
+        // we're going to override the forwarder
+        qdr_forwarder_t *old = ex->qdr_addr->forwarder;
+        qdr_forwarder_t *new = qdr_new_forwarder(qdr_forward_exchange_CT,
+                                                 old ? old->forward_attach : 0,
+                                                 old ? old->bypass_valid_origins: false);
+        ex->old_forwarder = old;
+        ex->qdr_addr->forwarder = new;
+        ex->qdr_addr->ref_count += 1;
+        ex->qdr_addr->exchange = ex;
+        DEQ_INSERT_TAIL(core->exchanges, ex);
+
+        if (alternate) {
+            ex->alternate = next_hop(ex, alternate, alt_phase);
+        }
+
+        qdr_post_mobile_added_CT(core,
+                                 (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle));
+    }
+
+    return ex;
+}
+
+static void qdr_exchange_free(qdr_exchange_t *ex)
+{
+    if (ex->core->running && DEQ_SIZE(ex->qdr_addr->rlinks) == 0) {
+        qdr_post_mobile_removed_CT(ex->core,
+                                   (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle));
+    }
+
+    DEQ_REMOVE(ex->core->exchanges, ex);
+    while (DEQ_SIZE(ex->bindings) > 0) {
+        // freeing the binding removes it from the binding list
+        // and the parse tree
+        qdr_binding_free(DEQ_HEAD(ex->bindings));
+    }
+    if (ex->alternate) {
+        next_hop_release(ex->alternate);
+    }
+    assert(DEQ_IS_EMPTY(ex->next_hops));
+
+    free(ex->qdr_addr->forwarder);
+    ex->qdr_addr->forwarder = ex->old_forwarder;
+    assert(ex->qdr_addr->ref_count > 0);
+    ex->qdr_addr->ref_count -= 1;
+    qdr_check_addr_CT(ex->core, ex->qdr_addr, false); // @TODO(kgiusti) ?is
+                                                      // false correct ?
+    free(ex->name);
+    free(ex->address);
+
+    qd_parse_tree_free(ex->parse_tree);
+    free_qdr_exchange_t(ex);
+}
+
+// Binding constructor/destructor
+
+static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
+                                  qd_iterator_t *name,
+                                  qd_iterator_t *key,
+                                  qd_iterator_t *nhop,
+                                  int            phase)
+{
+    qdr_binding_t *b = new_qdr_binding_t();
+    if (b) {
+        ZERO(b);
+        DEQ_ITEM_INIT_N(exchange_list, b);
+        DEQ_ITEM_INIT_N(tree_list, b);
+
+        b->name = qd_iterator_copy(name);
+        b->identity = qdr_identifier(ex->core);
+        b->exchange = ex;
+        b->key = qd_iterator_copy(key);
+        b->next_hop = next_hop(ex, nhop, phase);
+
+        qdr_binding_list_t  *bindings = NULL;
+        if (!qd_parse_tree_get_pattern(ex->parse_tree, key, (void **)&bindings)) {
+            // new pattern
+            bindings = malloc(sizeof(*bindings));
+            DEQ_INIT(*bindings);
+            qd_parse_tree_add_pattern(ex->parse_tree, key, bindings);
+        }
+        assert(bindings);
+        DEQ_INSERT_TAIL_N(tree_list, *bindings, b);
+        DEQ_INSERT_TAIL_N(exchange_list, ex->bindings, b);
+    }
+    return b;
+}
+
+
+static void qdr_binding_free(qdr_binding_t *b)
+{
+    qdr_binding_list_t *bindings = NULL;
+    qdr_exchange_t *ex = b->exchange;
+
+    qd_iterator_t *k_iter = qd_iterator_string((char *)b->key,
+                                               ITER_VIEW_ALL);
+    if (qd_parse_tree_get_pattern(ex->parse_tree, k_iter, (void **)&bindings)) {
+        assert(bindings && !DEQ_IS_EMPTY(*bindings));
+        DEQ_REMOVE_N(tree_list, *bindings, b);
+        if (DEQ_IS_EMPTY(*bindings)) {
+            qd_parse_tree_remove_pattern(ex->parse_tree, k_iter);
+            free(bindings);
+        }
+    }
+    qd_iterator_free(k_iter);
+
+    DEQ_REMOVE_N(exchange_list, b->exchange->bindings, b);
+    free(b->name);
+    free(b->key);
+    next_hop_release(b->next_hop);
+    free_qdr_binding_t(b);
+}
+
+
+// Next Hop constructor/destructor
+
+static next_hop_t *next_hop(qdr_exchange_t *ex,
+                            qd_iterator_t  *address,
+                            int             phase)
+{
+    next_hop_t *nh = find_next_hop(ex, address, phase);
+    if (!nh) {
+        nh = new_next_hop_t();
+        if (!nh) return NULL;
+        ZERO(nh);
+        DEQ_ITEM_INIT_N(exchange_list, nh);
+        DEQ_ITEM_INIT_N(transmit_list, nh);
+        nh->exchange = ex;
+        nh->next_hop = qd_iterator_copy(address);
+        nh->phase = phase;
+
+        qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+        qd_iterator_annotate_phase(address, (char) phase + '0');
+        qd_hash_retrieve(ex->core->addr_hash, address, (void **)&nh->qdr_addr);
+        if (!nh->qdr_addr) {
+            qdr_core_t *core = ex->core;
+            nh->qdr_addr = qdr_address_CT(core,
+                                          qdr_treatment_for_address_hash_CT(core,
+                                                                            address));
+            qd_hash_insert(core->addr_hash, address, nh->qdr_addr, &nh->qdr_addr->hash_handle);
+            DEQ_INSERT_TAIL(core->addrs, nh->qdr_addr);
+        }
+        nh->qdr_addr->ref_count += 1;
+        DEQ_INSERT_TAIL_N(exchange_list, ex->next_hops, nh);
+    }
+    nh->ref_count += 1;         // for caller's reference
+    return nh;
+}
+
+
+static void next_hop_release(next_hop_t *nh)
+{
+    assert(nh->ref_count > 0);
+    if (--nh->ref_count == 0) {
+        assert(nh->qdr_addr->ref_count > 0);
+        if (--nh->qdr_addr->ref_count == 0) {
+            // @TODO(kgiusti) is 'false' ok?
+            qdr_check_addr_CT(nh->exchange->core, nh->qdr_addr, false);
+        }
+        DEQ_REMOVE_N(exchange_list, nh->exchange->next_hops, nh);
+        assert(!nh->on_xmit_list);
+        free(nh->next_hop);
+        free_next_hop_t(nh);
+    }
+}
+
+
+// lookup
+
+static qdr_exchange_t *find_exchange(qdr_core_t *core, qd_iterator_t *identity, qd_iterator_t *name)
+{
+    qdr_exchange_t *ex = 0;
+    for (ex = DEQ_HEAD(core->exchanges); ex; ex = DEQ_NEXT(ex)) {
+        if (identity) {  // ignore name - identity takes precedence
+            // Convert the identity for comparison
+            char id[100];
+            snprintf(id, 100, "%"PRId64, ex->identity);
+            if (qd_iterator_equal(identity, (const unsigned char*) id))
+                break;
+        } else if (name && qd_iterator_equal(name, ex->name))
+            break;
+    }
+    return ex;
+}
+
+
+static qdr_binding_t *find_binding(qdr_core_t *core, qd_iterator_t *identity, qd_iterator_t *name)
+{
+    for (qdr_exchange_t *ex = DEQ_HEAD(core->exchanges); ex; ex = DEQ_NEXT(ex)) {
+        for (qdr_binding_t *binding = DEQ_HEAD(ex->bindings); binding; binding = DEQ_NEXT_N(exchange_list, binding)) {
+            if (identity) {  // ignore name - identity takes precedence
+                // Convert the identity for comparison
+                char id[100];
+                snprintf(id, 100, "%"PRId64, binding->identity);
+                if (qd_iterator_equal(identity, (const unsigned char*) id))
+                    return binding;
+            } else if (name && qd_iterator_equal(name, binding->name))
+                return binding;
+        }
+    }
+    return NULL;
+}
+
+
+static next_hop_t *find_next_hop(qdr_exchange_t *ex,
+                                 qd_iterator_t  *address,
+                                 int             phase)
+{
+    next_hop_t *nh = DEQ_HEAD(ex->next_hops);
+    DEQ_FIND_N(exchange_list, nh, (phase == nh->phase) && qd_iterator_equal(address, nh->next_hop));
+    return nh;
+}
+
+
+// Management helper routines
+
+static void exchange_insert_column(qdr_exchange_t *ex, int col, qd_composed_field_t *body)
+{
+    switch(col) {
+    case QDR_CONFIG_EXCHANGE_NAME:
+        qd_compose_insert_string(body, (const char *)ex->name);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_IDENTITY: {
+        char id_str[100];
+        snprintf(id_str, 100, "%"PRId64, ex->identity);
+        qd_compose_insert_string(body, id_str);
+        break;
+    }
+
+    case QDR_CONFIG_EXCHANGE_ADDRESS:
+        qd_compose_insert_string(body, (const char *)ex->address);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_PHASE:
+        qd_compose_insert_int(body, ex->phase);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_ALTERNATE:
+        if (ex->alternate && ex->alternate->next_hop)
+            qd_compose_insert_string(body, (const char *)ex->alternate->next_hop);
+        else
+            qd_compose_insert_null(body);
+        break;
+
+
+    case QDR_CONFIG_EXCHANGE_ALT_PHASE:
+        if (ex->alternate)
+            qd_compose_insert_int(body, ex->alternate->phase);
+        else
+            qd_compose_insert_null(body);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_MATCH_METHOD:
+        switch (qd_parse_tree_type(ex->parse_tree)) {
+        case QD_PARSE_TREE_AMQP_0_10:
+            qd_compose_insert_string(body, "amqp");
+            break;
+        case QD_PARSE_TREE_MQTT:
+            qd_compose_insert_string(body, "mqtt");
+            break;
+        default:
+            break;
+        }
+        break;
+
+    case QDR_CONFIG_EXCHANGE_BINDING_COUNT:
+        qd_compose_insert_uint(body, DEQ_SIZE(ex->bindings));
+        break;
+
+    case QDR_CONFIG_EXCHANGE_RECEIVED:
+        qd_compose_insert_ulong(body, ex->msgs_received);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_DROPPED:
+        qd_compose_insert_ulong(body, ex->msgs_dropped);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_FORWARDED:
+        qd_compose_insert_ulong(body, ex->msgs_routed);
+        break;
+
+    case QDR_CONFIG_EXCHANGE_DIVERTED:
+        qd_compose_insert_ulong(body, ex->msgs_alternate);
+        break;
+    }
+}
+
+
+static void binding_insert_column(qdr_binding_t *b, int col, qd_composed_field_t *body)
+{
+    switch(col) {
+    case QDR_CONFIG_BINDING_NAME:
+        if (b->name)
+            qd_compose_insert_string(body, (char *)b->name);
+        else
+            qd_compose_insert_null(body);
+        break;
+
+    case QDR_CONFIG_BINDING_IDENTITY: {
+        char id_str[100];
+        snprintf(id_str, 100, "%"PRIu64, b->identity);
+        qd_compose_insert_string(body, id_str);
+        break;
+    }
+
+    case QDR_CONFIG_BINDING_EXCHANGE:
+        qd_compose_insert_string(body, (char *)b->exchange->name);
+        break;
+
+    case QDR_CONFIG_BINDING_KEY:
+        qd_compose_insert_string(body, (char *)b->key);
+        break;
+
+    case QDR_CONFIG_BINDING_NEXTHOP:
+        assert(b->next_hop && b->next_hop->next_hop);
+        qd_compose_insert_string(body, (char *)b->next_hop->next_hop);
+        break;
+
+    case QDR_CONFIG_BINDING_NHOP_PHASE:
+        assert(b->next_hop);
+        qd_compose_insert_int(body, b->next_hop->phase);
+        break;
+
+    case QDR_CONFIG_BINDING_MATCHED:
+        qd_compose_insert_ulong(body, b->msgs_matched);
+        break;
+    }
+}
+
+
+static void write_config_exchange_map(qdr_exchange_t      *ex,
+                                      qd_composed_field_t *body)
+{
+    qd_compose_start_map(body);
+
+    for(int i = 0; i < QDR_CONFIG_EXCHANGE_COLUMN_COUNT; i++) {
+        qd_compose_insert_string(body, qdr_config_exchange_columns[i]);
+        exchange_insert_column(ex, i, body);
+    }
+
+    qd_compose_end_map(body);
+}
+
+
+static void write_config_exchange_list(qdr_exchange_t *ex,
+                                       qdr_query_t    *query)
+{
+    qd_compose_start_list(query->body);
+
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        exchange_insert_column(ex, query->columns[i], query->body);
+        i++;
+    }
+
+    qd_compose_end_list(query->body);
+}
+
+
+static void write_config_binding_map(qdr_binding_t       *binding,
+                                     qd_composed_field_t *body)
+{
+    qd_compose_start_map(body);
+
+    for(int i = 0; i < QDR_CONFIG_BINDING_COLUMN_COUNT; i++) {
+        qd_compose_insert_string(body, qdr_config_binding_columns[i]);
+        binding_insert_column(binding, i, body);
+    }
+
+    qd_compose_end_map(body);
+}
+
+static void write_config_binding_list(qdr_binding_t *binding,
+                                      qdr_query_t   *query)
+{
+    qd_compose_start_list(query->body);
+
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        binding_insert_column(binding, query->columns[i], query->body);
+        i++;
+    }
+
+    qd_compose_end_list(query->body);
+}
+
+
+static qdr_binding_t *get_binding_at_index(qdr_core_t *core, int index)
+{
+    qdr_binding_t *binding = 0;
+
+    // skip to the proper exchange:
+    qdr_exchange_t *ex = DEQ_HEAD(core->exchanges);
+    while (ex && index >= DEQ_SIZE(ex->bindings)) {
+        index -= DEQ_SIZE(ex->bindings);
+        ex = DEQ_NEXT(ex);
+    }
+
+    if (ex) {
+        // then to the target binding
+        assert(index < DEQ_SIZE(ex->bindings));
+        binding = DEQ_HEAD(ex->bindings);
+        while (index--) {
+            binding = DEQ_NEXT_N(exchange_list, binding);
+        }
+    }
+    return binding;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/exchange_bindings.h
----------------------------------------------------------------------
diff --git a/src/router_core/exchange_bindings.h b/src/router_core/exchange_bindings.h
new file mode 100644
index 0000000..ef802af
--- /dev/null
+++ b/src/router_core/exchange_bindings.h
@@ -0,0 +1,70 @@
+#ifndef EXCHANGE_BINDINGS_H
+#define EXCHANGE_BINDINGS_H 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#define QDR_CONFIG_EXCHANGE_COLUMN_COUNT 12
+#define QDR_CONFIG_BINDING_COLUMN_COUNT  7
+
+extern const char *qdr_config_exchange_columns[];
+extern const char *qdr_config_binding_columns[];
+
+void qdr_exchange_free_all(qdr_core_t *core);
+long qdr_exchange_binding_count(const qdr_exchange_t *ex);
+qdr_address_t *qdr_exchange_alternate_addr(const qdr_exchange_t *ex);
+
+// management:
+void qdra_config_exchange_create_CT(qdr_core_t         *core,
+                                    qd_iterator_t      *name,
+                                    qdr_query_t        *query,
+                                    qd_parsed_field_t  *in_body);
+void qdra_config_exchange_delete_CT(qdr_core_t    *core,
+                                    qdr_query_t   *query,
+                                    qd_iterator_t *name,
+                                    qd_iterator_t *identity);
+void qdra_config_exchange_get_CT(qdr_core_t    *core,
+                                 qd_iterator_t *name,
+                                 qd_iterator_t *identity,
+                                 qdr_query_t   *query,
+                                 const char    *columns[]);
+void qdra_config_exchange_get_first_CT(qdr_core_t  *core,
+                                       qdr_query_t *query,
+                                       int          offset);
+void qdra_config_exchange_get_next_CT(qdr_core_t  *core,
+                                      qdr_query_t *query);
+void qdra_config_binding_get_first_CT(qdr_core_t  *core,
+                                      qdr_query_t *query,
+                                      int          offset);
+void qdra_config_binding_get_next_CT(qdr_core_t  *core,
+                                     qdr_query_t *query);
+void qdra_config_binding_create_CT(qdr_core_t         *core,
+                                   qd_iterator_t      *name,
+                                   qdr_query_t        *query,
+                                   qd_parsed_field_t  *in_body);
+void qdra_config_binding_delete_CT(qdr_core_t    *core,
+                                   qdr_query_t   *query,
+                                   qd_iterator_t *name,
+                                   qd_iterator_t *identity);
+void qdra_config_binding_get_CT(qdr_core_t    *core,
+                                qd_iterator_t *name,
+                                qd_iterator_t *identity,
+                                qdr_query_t   *query,
+                                const char    *columns[]);
+#endif /* exchange_bindings.h */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index ee66e0d..82bce7c 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -21,29 +21,8 @@
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 #include <strings.h>
+#include "forwarder.h"
 
-//
-// NOTE: If the in_delivery argument is NULL, the resulting out deliveries
-//       shall be pre-settled.
-//
-typedef int (*qdr_forward_message_t) (qdr_core_t      *core,
-                                      qdr_address_t   *addr,
-                                      qd_message_t    *msg,
-                                      qdr_delivery_t  *in_delivery,
-                                      bool             exclude_inprocess,
-                                      bool             control);
-
-typedef bool (*qdr_forward_attach_t) (qdr_core_t     *core,
-                                      qdr_address_t  *addr,
-                                      qdr_link_t     *link,
-                                      qdr_terminus_t *source,
-                                      qdr_terminus_t *target);
-
-struct qdr_forwarder_t {
-    qdr_forward_message_t forward_message;
-    qdr_forward_attach_t  forward_attach;
-    bool                  bypass_valid_origins;
-};
 
 //==================================================================================
 // Built-in Forwarders
@@ -850,11 +829,12 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treat
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
                            bool exclude_inprocess, bool control)
 {
+    int fanout = 0;
     if (addr->forwarder)
-        return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
+        fanout = addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
 
     // TODO - Deal with this delivery's disposition
-    return 0;
+    return fanout;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/forwarder.h
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.h b/src/router_core/forwarder.h
new file mode 100644
index 0000000..aea9afd
--- /dev/null
+++ b/src/router_core/forwarder.h
@@ -0,0 +1,48 @@
+#ifndef qd_router_core_forwarder
+#define qd_router_core_forwarder 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+//
+// NOTE: If the in_delivery argument is NULL, the resulting out deliveries
+//       shall be pre-settled.
+//
+typedef int (*qdr_forward_message_t) (qdr_core_t      *core,
+                                      qdr_address_t   *addr,
+                                      qd_message_t    *msg,
+                                      qdr_delivery_t  *in_delivery,
+                                      bool             exclude_inprocess,
+                                      bool             control);
+
+typedef bool (*qdr_forward_attach_t) (qdr_core_t     *core,
+                                      qdr_address_t  *addr,
+                                      qdr_link_t     *link,
+                                      qdr_terminus_t *source,
+                                      qdr_terminus_t *target);
+
+struct qdr_forwarder_t {
+    qdr_forward_message_t forward_message;
+    qdr_forward_attach_t  forward_attach;
+    bool                  bypass_valid_origins;
+};
+
+qdr_forwarder_t *qdr_new_forwarder(qdr_forward_message_t fm, qdr_forward_attach_t fa, bool bypass_valid_origins);
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index 252a0fd..737a096 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -40,14 +40,16 @@ const char *IDENTITY = "identity";
 const char *OPERATION = "operation";
 const char *ATTRIBUTE_NAMES = "attributeNames";
 
-const unsigned char *config_address_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.address";
-const unsigned char *link_route_entity_type     = (unsigned char*) "org.apache.qpid.dispatch.router.config.linkRoute";
-const unsigned char *auto_link_entity_type      = (unsigned char*) "org.apache.qpid.dispatch.router.config.autoLink";
-const unsigned char *address_entity_type        = (unsigned char*) "org.apache.qpid.dispatch.router.address";
-const unsigned char *link_entity_type           = (unsigned char*) "org.apache.qpid.dispatch.router.link";
-const unsigned char *console_entity_type        = (unsigned char*) "org.apache.qpid.dispatch.console";
-const unsigned char *router_entity_type         = (unsigned char*) "org.apache.qpid.dispatch.router";
-const unsigned char *connection_entity_type     = (unsigned char*) "org.apache.qpid.dispatch.connection";
+const unsigned char *config_address_entity_type  = (unsigned char*) "org.apache.qpid.dispatch.router.config.address";
+const unsigned char *link_route_entity_type      = (unsigned char*) "org.apache.qpid.dispatch.router.config.linkRoute";
+const unsigned char *auto_link_entity_type       = (unsigned char*) "org.apache.qpid.dispatch.router.config.autoLink";
+const unsigned char *address_entity_type         = (unsigned char*) "org.apache.qpid.dispatch.router.address";
+const unsigned char *link_entity_type            = (unsigned char*) "org.apache.qpid.dispatch.router.link";
+const unsigned char *console_entity_type         = (unsigned char*) "org.apache.qpid.dispatch.console";
+const unsigned char *router_entity_type          = (unsigned char*) "org.apache.qpid.dispatch.router";
+const unsigned char *connection_entity_type      = (unsigned char*) "org.apache.qpid.dispatch.connection";
+const unsigned char *config_exchange_entity_type = (unsigned char*) "org.apache.qpid.dispatch.router.config.exchange";
+const unsigned char *config_binding_entity_type  = (unsigned char*) "org.apache.qpid.dispatch.router.config.binding";
 
 const char * const status_description = "statusDescription";
 const char * const correlation_id = "correlation-id";
@@ -432,6 +434,10 @@ static bool qd_can_handle_request(qd_parsed_field_t           *properties_fld,
         *entity_type = QD_ROUTER_FORBIDDEN;
     else if (qd_iterator_equal(qd_parse_raw(parsed_field), connection_entity_type))
         *entity_type = QD_ROUTER_CONNECTION;
+    else if (qd_iterator_equal(qd_parse_raw(parsed_field), config_exchange_entity_type))
+        *entity_type = QD_ROUTER_EXCHANGE;
+    else if (qd_iterator_equal(qd_parse_raw(parsed_field), config_binding_entity_type))
+        *entity_type = QD_ROUTER_BINDING;
     else
         return false;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fff61db8/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a6f015a..79f3d11 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -19,6 +19,7 @@
 
 #include "router_core_private.h"
 #include "route_control.h"
+#include "exchange_bindings.h"
 #include <stdio.h>
 #include <strings.h>
 
@@ -46,6 +47,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area,
     core->router_mode = mode;
     core->router_area = area;
     core->router_id   = id;
+    DEQ_INIT(core->exchanges);
 
     //
     // Set up the logging sources for the router core
@@ -135,6 +137,8 @@ void qdr_core_free(qdr_core_t *core)
         qdr_core_delete_link_route(core, link_route);
     }
 
+    qdr_exchange_free_all(core);
+
     qdr_address_t *addr = 0;
     while ( (addr = DEQ_HEAD(core->addrs)) ) {
         qdr_core_remove_address(core, addr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org