You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/09/12 15:19:32 UTC

[1/3] qpid-broker-j git commit: QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 6d7ac3680 -> 1abc93597


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index ef61fc6..a4a3823 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -25,38 +25,31 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.Assert;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class TopicExchangeTest extends QpidTestCase
 {
 
-    private TopicExchangeImpl _exchange;
+    private TopicExchange<?> _exchange;
     private VirtualHost<?> _vhost;
-
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
     public void setUp() throws Exception
@@ -64,13 +57,18 @@ public class TopicExchangeTest extends QpidTestCase
         super.setUp();
         BrokerTestHelper.setUp();
         _vhost = BrokerTestHelper.createVirtualHost(getName());
+
         Map<String,Object> attributes = new HashMap<>();
         attributes.put(Exchange.NAME, "test");
         attributes.put(Exchange.DURABLE, false);
         attributes.put(Exchange.TYPE, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
 
-        _exchange = (TopicExchangeImpl) _vhost.createChild(Exchange.class, attributes);
+        _exchange = (TopicExchange) _vhost.createChild(Exchange.class, attributes);
         _exchange.open();
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
+
     }
 
     @Override
@@ -90,506 +88,509 @@ public class TopicExchangeTest extends QpidTestCase
         }
     }
 
-    private Queue<?> createQueue(String name)
-    {
-        Map<String,Object> attributes = new HashMap<>();
-        attributes.put(Queue.NAME, name);
-        return _vhost.createChild(Queue.class, attributes);
-    }
-
+    /* Thus the routing pattern *.stock.# matches the routing keys usd.stock and eur.stock.db but not stock.nasdaq. */
     public void testNoRoute() throws Exception
     {
-        Queue<?> queue = createQueue("a*#b");
-        _exchange.bind(queue.getName(), "a.*.#.b", null, false);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        routeMessage("a.b", 0l);
+        _exchange.bind(queue.getName(), "*.stock.#", null, false);
 
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "stock.nasdaq",
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpected routed to queue after bind", result.hasRoutes());
     }
 
     public void testDirectMatch() throws Exception
     {
-        Queue<?> queue = createQueue("ab");
-        _exchange.bind(queue.getName(), "a.b", null, false);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        routeMessage("a.b",0l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        _exchange.bind(queue.getName(), "a.b", null, false);
 
-        Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b",
+                                                                                       _instanceProperties);
 
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        assertEquals("Message unexpected routed to queue after bind", 1, result.getNumberOfRoutes());
 
-        int queueCount = routeMessage("a.c",1l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.c", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
     }
 
-
+    /** * matches a single word */
     public void testStarMatch() throws Exception
     {
-        Queue<?> queue = createQueue("a*");
-        _exchange.bind(queue.getName(), "a.*", null, false);
-
-        routeMessage("a.b",0l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-
-        routeMessage("a.c",1l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        _exchange.bind(queue.getName(), "a.*", null, false);
 
-        Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.bb", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        int queueCount = routeMessage("a",2l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
+        result = _exchange.route(_messageWithNoHeaders, "a.b.c", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
     }
 
+    /** # matches zero or more words */
     public void testHashMatch() throws Exception
     {
-        Queue<?> queue = createQueue("a#");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.#", null, false);
 
-        routeMessage("a.b.c",0l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-        routeMessage("a.b",1l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-
-        routeMessage("a.c",2l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        Assert.assertEquals("Wrong message received", 2l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        result = _exchange.route(_messageWithNoHeaders, "a.bb", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.b.c", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        routeMessage("a",3l);
+        result = _exchange.route(_messageWithNoHeaders, "a", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-
-        int queueCount = routeMessage("b", 4l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "b", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
     }
 
 
     public void testMidHash() throws Exception
     {
-        Queue<?> queue = createQueue("a");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.*.#.b", null, false);
 
-        routeMessage("a.c.d.b",0l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.c.d.b",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        routeMessage("a.c.b",1l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.c.d.d.b", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
+        result = _exchange.route(_messageWithNoHeaders, "a.c.b", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
     }
 
     public void testMatchAfterHash() throws Exception
     {
-        Queue<?> queue = createQueue("a#");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.*.#.b.c", null, false);
 
-        int queueCount = routeMessage("a.c.b.b",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-
-        routeMessage("a.a.b.c",1l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.c.b.b",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
-        Assert.assertEquals("Wrong message received",
-                            1l,
-                            queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+        result = _exchange.route(_messageWithNoHeaders, "a.a.b.c", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-        queueCount = routeMessage("a.b.c.b",2l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-        routeMessage("a.b.c.b.c",3l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
-
-        Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.b.c.b", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
+        result = _exchange.route(_messageWithNoHeaders, "a.b.c.b.c", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
     }
 
 
     public void testHashAfterHash() throws Exception
     {
-        Queue<?> queue = createQueue("a#");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.*.#.b.c.#.d", null, false);
 
-        int queueCount = routeMessage("a.c.b.b.c",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-        routeMessage("a.a.b.c.d",1l);
 
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.c.b.b.c",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
-        Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders,
+                                 "a.a.b.c.d",
+                                 _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
     }
 
     public void testHashHash() throws Exception
     {
-        Queue<?> queue = createQueue("a#");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.#.*.#.d", null, false);
 
-        int queueCount = routeMessage("a.c.b.b.c",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
-        routeMessage("a.a.b.c.d",1l);
-
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.c.b.b.c",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
-        Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
-
-        queue.clearQueue();
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        result = _exchange.route(_messageWithNoHeaders, "a.c.b.b.c", _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
 
+        result = _exchange.route(_messageWithNoHeaders, "a.a.b.c.d", _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
     }
 
     public void testSubMatchFails() throws Exception
     {
-        Queue<?> queue = createQueue("a");
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
         _exchange.bind(queue.getName(), "a.b.c.d", null, false);
 
-        int queueCount = routeMessage("a.b.c",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
-
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
-
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b.c",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(0, result.getNumberOfRoutes());
     }
 
-    public void testMoreRouting() throws Exception
+    public void testRouteToManyQueues() throws Exception
     {
-        Queue<?> queue = createQueue("a");
-        _exchange.bind(queue.getName(), "a.b", null, false);
+        Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
+        _exchange.bind(queue1.getName(), "a.b", null, false);
+        _exchange.bind(queue2.getName(), "a.*", null, false);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b",
+                                                                                       _instanceProperties);
+        Assert.assertEquals(2, result.getNumberOfRoutes());
 
-        int queueCount = routeMessage("a.b.c",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
+        result = _exchange.route(_messageWithNoHeaders,
+                                 "a.c",
+                                 _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        _exchange.deleteBinding("a.b", queue1);
+
+        result = _exchange.route(_messageWithNoHeaders,
+                                 "a.b",
+                                 _instanceProperties);
+        Assert.assertEquals(1, result.getNumberOfRoutes());
 
     }
 
-    public void testMoreQueue() throws Exception
+    public void testRouteToQueueWithSelector()
     {
-        Queue<?> queue = createQueue("a");
-        _exchange.bind(queue.getName(), "a.b", null, false);
+        String bindingKey = "mybinding";
 
-        int queueCount = routeMessage("a",0l);
-        Assert.assertEquals("Message should not route to any queues", 0, queueCount);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        Assert.assertEquals(0, queue.getQueueDepthMessages());
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+        ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
 
-    }
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
 
-    public void testRouteWithJMSSelector() throws Exception
-    {
-        Queue<?> queue = createQueue("queue1");
-        final String bindingKey = "bindingKey";
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, "mybinding", instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
 
-        Map<String, Object> bindArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
-        _exchange.bind(queue.getName(), bindingKey, bindArgs, false);
+        result = _exchange.route(unmatchingMessage, "mybinding", instanceProperties);
+        assertFalse("Message without matching selector unexpectedly routed to queue", result.hasRoutes());
 
-        ServerMessage matchMsg1 = mock(ServerMessage.class);
-        when(matchMsg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
-        when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1);
-        routeMessage(matchMsg1, bindingKey, 1);
-        Assert.assertEquals("First message should be routed to queue", 1, queue.getQueueDepthMessages());
+        boolean unbind = _exchange.unbind(queue.getName(), bindingKey);
+        assertTrue("Unbind operation should be successful", unbind);
 
-        ServerMessage nonmatchMsg2 = mock(ServerMessage.class);
-        when(nonmatchMsg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        result = _exchange.route(matchingMessage, "mybinding", instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue after unbind", result.hasRoutes());
+    }
 
-        AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 5));
-        when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2);
-        routeMessage(nonmatchMsg2, bindingKey, 2);
-        Assert.assertEquals("Second message should not be routed to queue", 1, queue.getQueueDepthMessages());
+    public void testRouteToQueueViaTwoExchanges()
+    {
+        String bindingKey = "key";
 
-        ServerMessage nonmatchMsg3 = mock(ServerMessage.class);
-        when(nonmatchMsg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
-        AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, Object>emptyMap());
-        when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3);
-        routeMessage(nonmatchMsg3, bindingKey, 3);
-        Assert.assertEquals("Third message should not be routed to queue", 1, queue.getQueueDepthMessages());
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        ServerMessage matchMsg4 = mock(ServerMessage.class);
-        when(matchMsg4.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        boolean exchToViaBind = _exchange.bind(via.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
-        when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4);
-        routeMessage(matchMsg4, bindingKey, 4);
-        Assert.assertEquals("First message should be routed to queue", 2, queue.getQueueDepthMessages());
+        boolean viaToQueueBind = via.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
     }
 
-    public void testUpdateBindingReplacingSelector() throws Exception
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
     {
-        Queue<?> queue = createQueue("queue1");
-        final String bindingKey = "a";
-
-        Map<String, Object> originalArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
-        _exchange.bind(queue.getName(), bindingKey, originalArgs, false);
-
-        AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
-        ServerMessage msg1 = mock(ServerMessage.class);
-        when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        when(msg1.getMessageHeader()).thenReturn(mgsHeader1);
-
-        routeMessage(msg1, bindingKey, 1);
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        // Update the binding
-        Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
-        _exchange.replaceBinding(bindingKey, queue, newArgs);
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        // Message that would have matched the original selector but not the new
-        AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
-        ServerMessage msg2 = mock(ServerMessage.class);
-        when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
+        String bindingKey = "key";
+        String replacementKey = "key1";
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                        replacementKey),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        routeMessage(msg2, bindingKey, 2);
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        // Message that matches only the second
-        AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
-        ServerMessage msg3 = mock(ServerMessage.class);
-        when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
 
-        routeMessage(msg3, bindingKey, 2);
-        Assert.assertEquals(2, queue.getQueueDepthMessages());
 
+        result = _exchange.route(_messageWithNoHeaders, replacementKey, _instanceProperties);
+        assertFalse("Message unexpectedly was routed to queue", result.hasRoutes());
     }
 
-    // This demonstrates QPID-5785.  Deleting the exchange after this combination of binding
-    // updates generated a NPE
-    public void testUpdateBindingAddingSelector() throws Exception
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
     {
-        Queue<?> queue = createQueue("queue1");
-        final String bindingKey = "a";
-
-        _exchange.bind(queue.getName(), bindingKey, null, false);
+        String bindingKey = "key1";
+        String replacementKey = "key2";
 
-        ServerMessage msg1 = mock(ServerMessage.class);
-        when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        routeMessage(msg1, bindingKey, 1);
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
 
-        // Update the binding adding selector
-        Map<String, Object> newArgs = Collections.<String, Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
-        _exchange.replaceBinding(bindingKey, queue, newArgs);
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        // Message that does not match the new selector
-        AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6));
-        ServerMessage msg2 = mock(ServerMessage.class);
-        when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
 
-        routeMessage(msg2, bindingKey, 2);
-        Assert.assertEquals(1, queue.getQueueDepthMessages());
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, replacementKey);
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
 
-        // Message that matches the selector
-        AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7));
-        ServerMessage msg3 = mock(ServerMessage.class);
-        when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        routeMessage(msg3, bindingKey, 2);
-        Assert.assertEquals(2, queue.getQueueDepthMessages());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
 
-        _exchange.delete();
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 bindingKey,
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
-    private int routeMessage(String routingKey, long messageNumber)
+
+    public void testHierachicalRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
     {
-        ServerMessage message = mock(ServerMessage.class);
-        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        return routeMessage(message, routingKey, messageNumber);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue1 =
+                _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 =
+                _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
+
+        String bindingKey1 = "a.#";
+        String bindingKey2 = "a.*";
+        String replacementKey1 = "key1";
+        String replacementKey2 = "key2";
+
+        assertTrue("Exchange to exchange bind operation should be successful", _exchange.bind(via.getName(),
+                                                                                              bindingKey1,
+                                                                                              Collections.singletonMap(
+                                                                                                      Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                                                      replacementKey1),
+                                                                                              false));
+
+        assertTrue("Exchange to exchange bind operation should be successful", _exchange.bind(via.getName(),
+                                                                                              bindingKey2,
+                                                                                              Collections.singletonMap(
+                                                                                                      Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                                                      replacementKey2),
+                                                                                              false));
+
+        assertTrue("Exchange to queue1 bind operation should be successful",
+                   via.bind(queue1.getName(), replacementKey1, Collections.emptyMap(), false));
+
+        assertTrue("Exchange to queue2 bind operation should be successful",
+                   via.bind(queue2.getName(), replacementKey2, Collections.emptyMap(), false));
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "a.b",
+                                                                                       _instanceProperties);
+        assertEquals("Unexpected number of routes", 2, result.getNumberOfRoutes());
+
+        result = _exchange.route(_messageWithNoHeaders, "a.b.c", _instanceProperties);
+        assertEquals("Unexpected number of routes", 1, result.getNumberOfRoutes());
+
+        assertTrue("Message is not routed into 'queue1'", result.getRoutes().contains(queue1));
     }
 
-    private int routeMessage(ServerMessage message, String routingKey, long messageNumber)
+
+    public void testUpdateBindingReplacingSelector() throws Exception
     {
-        when(message.getInitialRoutingAddress()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = routeToQueues(message, routingKey, InstanceProperties.EMPTY);
-        MessageReference ref = mock(MessageReference.class);
-        when(ref.getMessage()).thenReturn(message);
-        when(message.newReference()).thenReturn(ref);
-        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
-        when(message.getMessageNumber()).thenReturn(messageNumber);
-        for(BaseQueue q : queues)
-        {
-            q.enqueue(message, null, null);
-        }
+        String bindingKey = "mybinding";
+
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, bindingKey, instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
 
-        return queues.size();
+
+        _exchange.replaceBinding(bindingKey, queue, Collections.singletonMap(JMS_SELECTOR.toString(), "prop = False"));
+
+        result = _exchange.route(matchingMessage, bindingKey, instanceProperties);
+        assertFalse("Message unexpectedly routed to queue after rebind", result.hasRoutes());
+
+        result = _exchange.route(matchingMessage, bindingKey, instanceProperties);
+        assertFalse(result.hasRoutes());
+
+        matchingMessage = createTestMessage(Collections.singletonMap("prop", false));
+        result = _exchange.route(matchingMessage, bindingKey, instanceProperties);
+        assertTrue("Message not routed to queue", result.hasRoutes());
     }
 
-    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
-                                                    final String routingAddress,
-                                                    final InstanceProperties instanceProperties)
+    public void testUpdateBindingRemovingSelector() throws Exception
     {
-        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
-        final List<BaseQueue> resultQueues = new ArrayList<>();
-        result.send(new ServerTransaction()
-        {
-            @Override
-            public long getTransactionStartTime()
-            {
-                return 0;
-            }
+        String bindingKey = "mybinding";
 
-            @Override
-            public long getTransactionUpdateTime()
-            {
-                return 0;
-            }
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public void addPostTransactionAction(final Action postTransactionAction)
-            {
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> message = createTestMessage(Collections.singletonMap("prop", false));
 
-            }
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
 
-            @Override
-            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result = _exchange.route(message, bindingKey, instanceProperties);
+        assertFalse("Message that does not match selector routed to queue", result.hasRoutes());
 
-            }
 
-            @Override
-            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
-            {
+        _exchange.replaceBinding(bindingKey, queue, Collections.emptyMap());
 
-            }
+        result = _exchange.route(message, bindingKey, instanceProperties);
+        assertTrue("Message not routed to queue after rebind", result.hasRoutes());
+    }
 
-            @Override
-            public void enqueue(final TransactionLogResource queue,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.add((BaseQueue) queue);
-            }
+    public void testUpdateBindingAddingSelector() throws Exception
+    {
+        String bindingKey = "mybinding";
 
-            @Override
-            public void enqueue(final Collection<? extends BaseQueue> queues,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.addAll(queues);
-            }
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public void commit()
-            {
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> message = createTestMessage(Collections.singletonMap("prop", false));
 
-            }
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.emptyMap(),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
 
-            @Override
-            public void commit(final Runnable immediatePostTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result = _exchange.route(message, bindingKey, instanceProperties);
+        assertTrue("Message not routed to queue", result.hasRoutes());
 
-            }
 
-            @Override
-            public void rollback()
-            {
+        _exchange.replaceBinding(bindingKey, queue, Collections.singletonMap(JMS_SELECTOR.toString(), "prop = false"));
 
-            }
+        result = _exchange.route(message, bindingKey, instanceProperties);
+        assertTrue("Message that matches selector not routed to queue after rebind", result.hasRoutes());
 
-            @Override
-            public boolean isTransactional()
-            {
-                return false;
-            }
-        }, null);
 
-        return resultQueues;
+        result = _exchange.route(message = createTestMessage(Collections.singletonMap("prop", true)), bindingKey, instanceProperties);
+        assertFalse("Message that does not match selector routed to queue after rebind", result.hasRoutes());
     }
 
-    private AMQMessageHeader createMessageHeader(Map<String, Object> headers)
+    public void testUpdateBindingChangeReplacementKey() throws Exception
     {
-        AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
-        for(Map.Entry<String, Object> entry : headers.entrySet())
-        {
-            String key = entry.getKey();
-            Object value = entry.getValue();
-
-            when(messageHeader.containsHeader(key)).thenReturn(true);
-            when(messageHeader.getHeader(key)).thenReturn(value);
-        }
-        return messageHeader;
+        String bindingKey = "mybinding";
+        String replacementKey = "key1";
+        String replacementKey2 = "key2";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue =
+                _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               Collections.emptyMap(),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
+
+
+        _exchange.bind(via.getName(),
+                       bindingKey,
+                       Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, replacementKey),
+                       true);
+
+        result = _exchange.route(_messageWithNoHeaders, bindingKey, _instanceProperties);
+        assertTrue("Message was not routed", result.hasRoutes());
+        assertTrue("Message was not routed to queue", result.getRoutes().contains(queue));
+
+        Queue<?> queue2 =
+                _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
+        assertTrue("Binding of queue2 failed",
+                   via.bind(queue2.getName(), replacementKey2, Collections.emptyMap(), false));
+
+        _exchange.bind(via.getName(),
+                       bindingKey,
+                       Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, replacementKey2),
+                       true);
+
+        result = _exchange.route(_messageWithNoHeaders, bindingKey, _instanceProperties);
+        assertTrue("Message was not routed", result.hasRoutes());
+        assertTrue("Message was not routed to queue2", result.getRoutes().contains(queue2));
     }
 
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
+    {
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/systests/src/test/java/org/apache/qpid/server/routing/ExchangeRoutingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/routing/ExchangeRoutingTest.java b/systests/src/test/java/org/apache/qpid/server/routing/ExchangeRoutingTest.java
new file mode 100644
index 0000000..0d29082
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/server/routing/ExchangeRoutingTest.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.routing;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ExchangeRoutingTest extends QpidBrokerTestCase
+{
+
+    private static final String AMQP_MNG_QPID_EXCHANGE_DIRECT = "org.apache.qpid.DirectExchange";
+    private static final String AMQP_MNG_QPID_EXCHANGE_FANOUT = "org.apache.qpid.FanoutExchange";
+    private static final String AMQP_MNG_QPID_EXCHANGE_TOPIC = "org.apache.qpid.FanoutExchange";
+    private static final String AMQP_MNG_QPID_QUEUE_STANDARD = "org.apache.qpid.StandardQueue";
+    private String _queueName;
+    private String _exchName1;
+    private String _exchName2;
+    private Connection _connection;
+    private Session _session;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _queueName = getTestQueueName() + "_queue";
+        _exchName1 = getTestQueueName() + "_exch1";
+        _exchName2 = getTestQueueName() + "_exch2";
+
+        _connection = getConnection();
+        _connection.start();
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        createEntityUsingAmqpManagement(_queueName, _session, AMQP_MNG_QPID_QUEUE_STANDARD);
+
+    }
+
+    public void testExchangeToQueueRouting() throws Exception
+    {
+        String routingKey = "key";
+
+        createEntityUsingAmqpManagement(_exchName1, _session, AMQP_MNG_QPID_EXCHANGE_DIRECT);
+
+        final Map<String, Object> bindingArguments = new HashMap<>();
+        bindingArguments.put("destination", _queueName);
+        bindingArguments.put("bindingKey", routingKey);
+
+        performOperationUsingAmqpManagement(_exchName1,
+                                            "bind",
+                                            _session,
+                                            "org.apache.qpid.Exchange",
+                                            bindingArguments);
+
+        routeTest(_exchName1, _queueName, "unboundKey", 0, 0);
+        routeTest(_exchName1, _queueName, routingKey, 0, 1);
+    }
+
+    public void testExchangeToExchangeToQueueRouting() throws Exception
+    {
+        String bindingKey = "key";
+
+        createEntityUsingAmqpManagement(_exchName1, _session, AMQP_MNG_QPID_EXCHANGE_DIRECT);
+        createEntityUsingAmqpManagement(_exchName2, _session, AMQP_MNG_QPID_EXCHANGE_DIRECT);
+
+        final Map<String, Object> binding1Arguments = new HashMap<>();
+        binding1Arguments.put("destination", _exchName2);
+        binding1Arguments.put("bindingKey", bindingKey);
+
+        performOperationUsingAmqpManagement(_exchName1,
+                                            "bind",
+                                            _session,
+                                            "org.apache.qpid.Exchange",
+                                            binding1Arguments);
+
+        final Map<String, Object> binding2Arguments = new HashMap<>();
+        binding2Arguments.put("destination", _queueName);
+        binding2Arguments.put("bindingKey", bindingKey);
+
+        performOperationUsingAmqpManagement(_exchName2,
+                                            "bind",
+                                            _session,
+                                            "org.apache.qpid.Exchange",
+                                            binding2Arguments);
+
+        routeTest(_exchName1, _queueName, bindingKey, 0, 1);
+    }
+
+    public void testExchangeToExchangeToQueueRoutingWithReplacementRoutingKey() throws Exception
+    {
+        String bindingKey1 = "key1";
+        String bindingKey2 = "key2";
+
+        createEntityUsingAmqpManagement(_exchName1, _session, AMQP_MNG_QPID_EXCHANGE_DIRECT);
+        createEntityUsingAmqpManagement(_exchName2, _session, AMQP_MNG_QPID_EXCHANGE_DIRECT);
+
+        final Map<String, Object> binding1Arguments = new HashMap<>();
+        binding1Arguments.put("destination", _exchName2);
+        binding1Arguments.put("bindingKey", bindingKey1);
+        binding1Arguments.put("arguments",
+                              new ObjectMapper().writeValueAsString(Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                                             bindingKey2)));
+
+        performOperationUsingAmqpManagement(_exchName1,
+                                            "bind",
+                                            _session,
+                                            "org.apache.qpid.Exchange",
+                                            binding1Arguments);
+
+
+        final Map<String, Object> binding2Arguments = new HashMap<>();
+        binding2Arguments.put("destination", _queueName);
+        binding2Arguments.put("bindingKey", bindingKey2);
+
+        performOperationUsingAmqpManagement(_exchName2,
+                                            "bind",
+                                            _session,
+                                            "org.apache.qpid.Exchange",
+                                            binding2Arguments);
+
+        routeTest(_exchName1, _queueName, bindingKey1, 0, 1);
+    }
+
+    private void routeTest(final String fromExchangeName,
+                           final String queueName,
+                           final String routingKey,
+                           final int expectedDepthBefore,
+                           final int expectedDepthAfter) throws Exception
+    {
+        Destination ingressExchangeDest = _session.createQueue(getDestinationAddress(fromExchangeName, routingKey));
+        Queue queueDest = _session.createQueue(queueName);
+
+        assertEquals(String.format("Unexpected number of messages on queue '%s'", queueName),
+                     expectedDepthBefore, getQueueDepth(_connection, queueDest));
+
+        sendMessage(_session, ingressExchangeDest, 1);
+
+        assertEquals(String.format("Unexpected number of messages on queue '%s", queueName),
+                     expectedDepthAfter, getQueueDepth(_connection, queueDest));
+    }
+
+    private String getDestinationAddress(final String exchangeName, final String routingKey)
+    {
+        return isBroker10() ? String.format("%s/%s", exchangeName,routingKey): String.format("ADDR:%s/%s", exchangeName,routingKey);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'

Posted by or...@apache.org.
QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1abc9359
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1abc9359
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1abc9359

Branch: refs/heads/master
Commit: 1abc93597c5b2ff934fa7febee459a732791e3fa
Parents: 6d7ac36
Author: Keith Wall <kw...@apache.org>
Authored: Tue Sep 5 16:17:29 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Sep 12 16:17:27 2017 +0100

----------------------------------------------------------------------
 .../server/exchange/DirectExchangeImpl.java     | 165 +++--
 .../server/exchange/FanoutExchangeImpl.java     | 269 +++----
 ...FilterManagerReplacementRoutingKeyTuple.java |  46 ++
 .../qpid/server/exchange/HeadersBinding.java    |  21 +-
 .../server/exchange/HeadersExchangeImpl.java    |  73 +-
 .../qpid/server/exchange/TopicExchangeImpl.java | 193 +++--
 .../exchange/topic/TopicExchangeResult.java     | 132 ++--
 .../qpid/server/exchange/topic/TopicParser.java |  48 +-
 .../qpid/server/message/RoutingResult.java      |  11 +
 .../org/apache/qpid/server/model/Binding.java   |   1 +
 .../org/apache/qpid/server/model/Exchange.java  |   3 +-
 .../server/exchange/DirectExchangeTest.java     | 218 +++++-
 .../server/exchange/FanoutExchangeTest.java     | 471 ++++++------
 .../server/exchange/HeadersExchangeTest.java    | 401 +++++-----
 .../qpid/server/exchange/TopicExchangeTest.java | 731 ++++++++++---------
 .../server/routing/ExchangeRoutingTest.java     | 173 +++++
 16 files changed, 1603 insertions(+), 1353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
index 6c4be90..7ce9c37 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeImpl.java
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.model.Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY;
+
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -40,7 +40,6 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
@@ -51,45 +50,46 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
 
     private final class BindingSet
     {
-        private final Set<MessageDestination> _unfilteredQueues;
-        private final Map<MessageDestination, FilterManager> _filteredQueues;
+        private final Map<MessageDestination, String> _unfilteredDestinations;
+        private final Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> _filteredDestinations;
 
-        public BindingSet()
+        BindingSet()
         {
-            _unfilteredQueues = Collections.emptySet();
-            _filteredQueues = Collections.emptyMap();
+            _unfilteredDestinations = Collections.emptyMap();
+            _filteredDestinations = Collections.emptyMap();
         }
 
-        private BindingSet(final Set<MessageDestination> unfilteredQueues,
-                           final Map<MessageDestination, FilterManager> filteredQueues)
+        private BindingSet(final Map<MessageDestination, String> unfilteredDestinations,
+                           final Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations)
         {
-            _unfilteredQueues = unfilteredQueues;
-            _filteredQueues = filteredQueues;
+            _unfilteredDestinations = unfilteredDestinations;
+            _filteredDestinations = filteredDestinations;
         }
 
-        public Set<MessageDestination> getUnfilteredQueues()
+        Map<MessageDestination, String> getUnfilteredDestinations()
         {
-            return _unfilteredQueues;
+            return _unfilteredDestinations;
         }
 
-        public boolean hasFilteredQueues()
+        boolean hasFilteredQueues()
         {
-            return !_filteredQueues.isEmpty();
+            return !_filteredDestinations.isEmpty();
         }
 
         boolean isEmpty()
         {
-            return _unfilteredQueues.isEmpty() && _filteredQueues.isEmpty();
+            return _unfilteredDestinations.isEmpty() && _filteredDestinations.isEmpty();
         }
 
-        public Map<MessageDestination,FilterManager> getFilteredQueues()
+        Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> getFilteredDestinations()
         {
-            return _filteredQueues;
+            return _filteredDestinations;
         }
 
         BindingSet putBinding(MessageDestination destination, Map<String, Object> arguments, boolean force)
         {
-            if(!force && (_unfilteredQueues.contains(destination) || _filteredQueues.containsKey(destination)))
+            if (!force && (_unfilteredDestinations.containsKey(destination) || _filteredDestinations.containsKey(
+                    destination)))
             {
                 return this;
             }
@@ -97,72 +97,81 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
             {
                 try
                 {
-                    Set<MessageDestination> unfilteredQueues;
-                    Map<MessageDestination, FilterManager> filteredQueues;
-                    if (_unfilteredQueues.contains(destination))
+                    Map<MessageDestination, String> unfilteredDestinations;
+                    Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+                    if (_unfilteredDestinations.containsKey(destination))
                     {
-                        unfilteredQueues = new HashSet<>(_unfilteredQueues);
-                        unfilteredQueues.remove(destination);
+                        unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+                        unfilteredDestinations.remove(destination);
                     }
                     else
                     {
-                        unfilteredQueues = _unfilteredQueues;
+                        unfilteredDestinations = _unfilteredDestinations;
                     }
 
-                    filteredQueues = new HashMap<>(_filteredQueues);
-                    filteredQueues.put(destination,
-                                       FilterSupport.createMessageFilter(arguments, (Queue<?>) destination));
+                    filteredDestinations = new HashMap<>(_filteredDestinations);
+                    FilterManager messageFilter = FilterSupport.createMessageFilter(arguments, destination);
+                    String replacementRoutingKey = arguments.containsKey(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY)
+                            ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+                            : null;
+                    filteredDestinations.put(destination,
+                                       new FilterManagerReplacementRoutingKeyTuple(messageFilter,
+                                                                                   replacementRoutingKey));
 
-                    return new BindingSet(Collections.unmodifiableSet(unfilteredQueues), Collections.unmodifiableMap(filteredQueues));
+                    return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations),
+                                          Collections.unmodifiableMap(filteredDestinations));
 
                 }
                 catch (AMQInvalidArgumentException e)
                 {
-                    _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + destination.getName()
-                                 + "' to exchange '" + DirectExchangeImpl.this.getName()
-                                 + "' with arguments: " + arguments, e);
+                    _logger.warn(
+                            "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
+                            destination.getName(),
+                            DirectExchangeImpl.this.getName(),
+                            arguments,
+                            e);
                     return this;
                 }
 
             }
             else
             {
-                Set<MessageDestination> unfilteredQueues;
-                Map<MessageDestination, FilterManager> filteredQueues;
-                if (_filteredQueues.containsKey(destination))
+                Map<MessageDestination, String> unfilteredDestinations;
+                Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+                if (_filteredDestinations.containsKey(destination))
                 {
-                    filteredQueues = new HashMap<>(_filteredQueues);
-                    filteredQueues.remove(destination);
+                    filteredDestinations = new HashMap<>(_filteredDestinations);
+                    filteredDestinations.remove(destination);
                 }
                 else
                 {
-                    filteredQueues = _filteredQueues;
+                    filteredDestinations = _filteredDestinations;
                 }
 
-                unfilteredQueues = new HashSet<>(_unfilteredQueues);
-                unfilteredQueues.add(destination);
-
-                return new BindingSet(Collections.unmodifiableSet(unfilteredQueues), Collections.unmodifiableMap(filteredQueues));
+                unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+                Object replacementRoutingKey = arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+                unfilteredDestinations.put(destination, replacementRoutingKey == null ? null : String.valueOf(replacementRoutingKey));
+                return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), Collections.unmodifiableMap(filteredDestinations));
 
             }
         }
 
-        public BindingSet removeBinding(final MessageDestination destination)
+        BindingSet removeBinding(final MessageDestination destination)
         {
-            Set<MessageDestination> unfilteredQueues;
-            Map<MessageDestination, FilterManager> filteredQueues;
-            if (_unfilteredQueues.contains(destination))
+            Map<MessageDestination, String> unfilteredDestinations;
+            Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations;
+            if (_unfilteredDestinations.containsKey(destination))
             {
-                unfilteredQueues = new HashSet<>(_unfilteredQueues);
-                unfilteredQueues.remove(destination);
+                unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+                unfilteredDestinations.remove(destination);
 
-                return new BindingSet(Collections.unmodifiableSet(unfilteredQueues),_filteredQueues);
+                return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
             }
-            else if(_filteredQueues.containsKey(destination))
+            else if(_filteredDestinations.containsKey(destination))
             {
-                filteredQueues = new HashMap<>(_filteredQueues);
-                filteredQueues.remove(destination);
-                return new BindingSet(_unfilteredQueues, Collections.unmodifiableMap(filteredQueues));
+                filteredDestinations = new HashMap<>(_filteredDestinations);
+                filteredDestinations.remove(destination);
+                return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
             }
             else
             {
@@ -172,52 +181,54 @@ public class DirectExchangeImpl extends AbstractExchange<DirectExchangeImpl> imp
         }
     }
 
-    private final ConcurrentMap<String, BindingSet> _bindingsByKey =
-            new ConcurrentHashMap<String, BindingSet>();
+    private final ConcurrentMap<String, BindingSet> _bindingsByKey = new ConcurrentHashMap<>();
 
     @ManagedObjectFactoryConstructor
-    public DirectExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
+    DirectExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
     {
         super(attributes, vhost);
     }
 
 
     @Override
-    public  <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(final M payload,
-                                                                                      final String routingKey,
-                                                                                      final InstanceProperties instanceProperties,
-                                                                                      final RoutingResult<M> result)
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(final M payload,
+                                                                                     final String routingKey,
+                                                                                     final InstanceProperties instanceProperties,
+                                                                                     final RoutingResult<M> result)
     {
-
         BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
-
-        if(bindings != null)
+        if (bindings != null)
         {
-            final Set<MessageDestination> unfilteredQueues = bindings.getUnfilteredQueues();
-            for(MessageDestination destination : unfilteredQueues)
+            final Map<MessageDestination, String> unfilteredDestinations = bindings.getUnfilteredDestinations();
+            for (MessageDestination destination : unfilteredDestinations.keySet())
             {
-                result.add(destination.route(payload, routingKey, instanceProperties));
+                String actualRoutingKey = unfilteredDestinations.get(destination) == null
+                        ? routingKey
+                        : unfilteredDestinations.get(destination);
+                result.add(destination.route(payload, actualRoutingKey, instanceProperties));
             }
 
-            if(bindings.hasFilteredQueues())
+            if (bindings.hasFilteredQueues())
             {
                 Filterable filterable = Filterable.Factory.newInstance(payload, instanceProperties);
 
-                Map<MessageDestination, FilterManager> filteredQueues = bindings.getFilteredQueues();
-                for(Map.Entry<MessageDestination, FilterManager> entry : filteredQueues.entrySet())
+                Map<MessageDestination, FilterManagerReplacementRoutingKeyTuple> filteredDestinations =
+                        bindings.getFilteredDestinations();
+                for (Map.Entry<MessageDestination, FilterManagerReplacementRoutingKeyTuple> entry : filteredDestinations
+                        .entrySet())
                 {
-                    if(!unfilteredQueues.contains(entry.getKey()))
+                    FilterManagerReplacementRoutingKeyTuple tuple = entry.getValue();
+                    String actualRoutingKey = tuple.getReplacementRoutingKey() == null
+                            ? routingKey
+                            : tuple.getReplacementRoutingKey();
+
+                    if (tuple.getFilterManager().allAllow(filterable))
                     {
-                        FilterManager filter = entry.getValue();
-                        if(filter.allAllow(filterable))
-                        {
-                            result.add(entry.getKey().route(payload, routingKey, instanceProperties));
-                        }
+                        result.add(entry.getKey().route(payload, actualRoutingKey, instanceProperties));
                     }
                 }
             }
         }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
index d6864d4..683d28b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeImpl.java
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
+import static org.apache.qpid.server.model.Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY;
+
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +40,6 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
@@ -46,169 +47,122 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
 {
     private static final Logger _logger = LoggerFactory.getLogger(FanoutExchangeImpl.class);
 
-    private static final Integer ONE = Integer.valueOf(1);
-
     private final class BindingSet
     {
-        private final Map<MessageDestination,Integer> _queues;
-
-        private final List<MessageDestination> _unfilteredQueues;
-        private final List<MessageDestination> _filteredQueues;
+        private final Map<MessageDestination, Map<BindingIdentifier, String>> _unfilteredDestinations;
+        private final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+                _filteredDestinations;
 
-        private final Map<MessageDestination,Map<BindingIdentifier, FilterManager>> _filteredBindings;
-
-        public BindingSet(final Map<MessageDestination, Integer> queues,
-                          final List<MessageDestination> unfilteredQueues,
-                          final List<MessageDestination> filteredQueues,
-                          final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings)
+        BindingSet(final Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations,
+                   final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> filteredDestinations)
         {
-            _queues = queues;
-            _unfilteredQueues = unfilteredQueues;
-            _filteredQueues = filteredQueues;
-            _filteredBindings = filteredBindings;
+            _unfilteredDestinations = unfilteredDestinations;
+            _filteredDestinations = filteredDestinations;
         }
 
-        public BindingSet()
+        BindingSet()
         {
-            _queues = Collections.emptyMap();
-            _unfilteredQueues = Collections.emptyList();
-            _filteredQueues = Collections.emptyList();
-            _filteredBindings = Collections.emptyMap();
+            _unfilteredDestinations = Collections.emptyMap();
+            _filteredDestinations = Collections.emptyMap();
         }
 
-        public BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
+        BindingSet addBinding(final BindingIdentifier binding, final Map<String, Object> arguments)
         {
-                if(FilterSupport.argumentsContainFilter(arguments))
+            MessageDestination destination = binding.getDestination();
+            if (FilterSupport.argumentsContainFilter(arguments))
+            {
+                try
                 {
-                    try
-                    {
-                        List<MessageDestination> filteredQueues;
-                        if (!(_filteredQueues.contains(binding.getDestination())
-                              || _unfilteredQueues.contains(binding.getDestination())))
-                        {
-                            filteredQueues = new ArrayList<>(_filteredQueues);
-                            filteredQueues.add(binding.getDestination());
-                            filteredQueues = Collections.unmodifiableList(filteredQueues);
-                        }
-                        else
-                        {
-                            filteredQueues = _filteredQueues;
-                        }
-                        Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings =
-                                new HashMap<>(_filteredBindings);
-                        Map<BindingIdentifier, FilterManager> bindingsForQueue =
-                                filteredBindings.get(binding.getDestination());
-                        if (bindingsForQueue == null)
-                        {
-                            bindingsForQueue = new HashMap<>();
-                        }
-                        else
-                        {
-                            bindingsForQueue = new HashMap<>(bindingsForQueue);
-                        }
-                        bindingsForQueue.put(binding,
-                                             FilterSupport.createMessageFilter(arguments,
-                                                                               binding.getDestination()));
-                        filteredBindings.put(binding.getDestination(), bindingsForQueue);
-                        return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
-                    }
-                    catch (AMQInvalidArgumentException e)
-                    {
-                        _logger.warn("Binding ignored: cannot parse filter on binding of queue '" + binding.getDestination().getName()
-                                     + "' to exchange '" + FanoutExchangeImpl.this.getName()
-                                     + "' with arguments: " + arguments, e);
-                        return this;
-                    }
+                    Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+                            filteredDestinations = new HashMap<>(_filteredDestinations);
+
+                    filteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+
+                    Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination =
+                            new HashMap<>(filteredDestinations.get(destination));
+
+                    FilterManager filterManager = FilterSupport.createMessageFilter(arguments, destination);
+                    String replacementRoutingKey = arguments.containsKey(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY)
+                            ? String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY))
+                            : null;
+
+                    bindingsForDestination.put(binding,
+                                               new FilterManagerReplacementRoutingKeyTuple(filterManager,
+                                                                                           replacementRoutingKey));
+                    filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
+                    return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
                 }
-                else
+                catch (AMQInvalidArgumentException e)
                 {
-                    Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
-                    List<MessageDestination> unfilteredQueues;
-                    List<MessageDestination> filteredQueues;
-                    if (queues.containsKey(binding.getDestination()))
-                    {
-                        queues.put(binding.getDestination(), queues.get(binding.getDestination()) + 1);
-                        unfilteredQueues = _unfilteredQueues;
-                        filteredQueues = _filteredQueues;
-                    }
-                    else
-                    {
-                        queues.put(binding.getDestination(), ONE);
-                        unfilteredQueues = new ArrayList<>(_unfilteredQueues);
-                        unfilteredQueues.add((Queue<?>)binding.getDestination());
-                        unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
-                        if(_filteredQueues.contains(binding.getDestination()))
-                        {
-                            filteredQueues = new ArrayList<>(_filteredQueues);
-                            filteredQueues.remove(binding.getDestination());
-                            filteredQueues = Collections.unmodifiableList(filteredQueues);
-                        }
-                        else
-                        {
-                            filteredQueues = _filteredQueues;
-                        }
-                    }
-                    return new BindingSet(queues, unfilteredQueues, filteredQueues, _filteredBindings);
+                    _logger.warn(
+                            "Binding ignored: cannot parse filter on binding of destination '{}' to exchange '{}' with arguments: {}",
+                            destination.getName(),
+                            FanoutExchangeImpl.this.getName(),
+                            arguments,
+                            e);
+                    return this;
+                }
+            }
+            else
+            {
+                Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations =
+                        new HashMap<>(_unfilteredDestinations);
+                unfilteredDestinations.computeIfAbsent(destination, messageDestination -> new HashMap<>());
+
+                String replacementRoutingKey = null;
+                if (arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY) != null)
+                {
+                    replacementRoutingKey = String.valueOf(arguments.get(BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY));
                 }
+
+                Map<BindingIdentifier, String> replacementRoutingKeysForDestination =
+                        new HashMap<>(unfilteredDestinations.get(destination));
+                replacementRoutingKeysForDestination.put(binding, replacementRoutingKey);
+
+                unfilteredDestinations.put(destination,
+                                           Collections.unmodifiableMap(replacementRoutingKeysForDestination));
+                return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
+            }
         }
 
-        public BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
+        BindingSet updateBinding(final BindingIdentifier binding, final Map<String, Object> newArguments)
         {
-
             return removeBinding(binding).addBinding(binding, newArguments);
         }
 
-        public BindingSet removeBinding(final BindingIdentifier binding)
+        BindingSet removeBinding(final BindingIdentifier binding)
         {
-            Queue<?> queue = (Queue<?>) binding.getDestination();
-            if(_filteredBindings.containsKey(queue) && _filteredBindings.get(queue).containsKey(binding))
+            MessageDestination destination = binding.getDestination();
+            if(_filteredDestinations.containsKey(destination) && _filteredDestinations.get(destination).containsKey(binding))
             {
-                final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings = new HashMap<>(_filteredBindings);
-                final Map<BindingIdentifier, FilterManager> bindingsForQueue = new HashMap<>(filteredBindings.remove(queue));
-                bindingsForQueue.remove(binding);
-                List<MessageDestination> filteredQueues;
-                if(bindingsForQueue.isEmpty())
+                final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> filteredDestinations = new HashMap<>(_filteredDestinations);
+                final Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingsForDestination = new HashMap<>(filteredDestinations.get(destination));
+                bindingsForDestination.remove(binding);
+                if (bindingsForDestination.isEmpty())
                 {
-                    filteredQueues = new ArrayList<>(_filteredQueues);
-                    filteredQueues.remove(queue);
-                    filteredQueues = Collections.unmodifiableList(filteredQueues);
+                    filteredDestinations.remove(destination);
                 }
                 else
                 {
-                    filteredBindings.put(queue, bindingsForQueue);
-                    filteredQueues = _filteredQueues;
+                    filteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
                 }
-                return new BindingSet(_queues, _unfilteredQueues, filteredQueues, Collections.unmodifiableMap(filteredBindings));
+                return new BindingSet(_unfilteredDestinations, Collections.unmodifiableMap(filteredDestinations));
             }
-            else if(_unfilteredQueues.contains(queue))
+            else if(_unfilteredDestinations.containsKey(destination) && _unfilteredDestinations.get(destination).containsKey(binding))
             {
-                Map<MessageDestination, Integer> queues = new HashMap<>(_queues);
-                int count = queues.remove(queue);
-                List<MessageDestination> unfilteredQueues;
-                List<MessageDestination> filteredQueues;
-                if(count > 1)
+                Map<MessageDestination, Map<BindingIdentifier, String>> unfilteredDestinations = new HashMap<>(_unfilteredDestinations);
+                final Map<BindingIdentifier, String> bindingsForDestination = new HashMap<>(unfilteredDestinations.get(destination));
+                bindingsForDestination.remove(binding);
+                if (bindingsForDestination.isEmpty())
                 {
-                    queues.put(queue, --count);
-                    unfilteredQueues = _unfilteredQueues;
-                    filteredQueues = _filteredQueues;
+                    unfilteredDestinations.remove(destination);
                 }
                 else
                 {
-                    unfilteredQueues = new ArrayList<>(_unfilteredQueues);
-                    unfilteredQueues.remove(queue);
-                    unfilteredQueues = Collections.unmodifiableList(unfilteredQueues);
-                    if(_filteredBindings.containsKey(queue))
-                    {
-                        filteredQueues = new ArrayList<>(_filteredQueues);
-                        filteredQueues.add(queue);
-                        filteredQueues = Collections.unmodifiableList(filteredQueues);
-                    }
-                    else
-                    {
-                        filteredQueues = _filteredQueues;
-                    }
+                    unfilteredDestinations.put(destination, Collections.unmodifiableMap(bindingsForDestination));
                 }
-                return new BindingSet(Collections.unmodifiableMap(queues), unfilteredQueues, filteredQueues, _filteredBindings);
+
+                return new BindingSet(Collections.unmodifiableMap(unfilteredDestinations), _filteredDestinations);
             }
             else
             {
@@ -217,12 +171,8 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
         }
     }
 
-
     private volatile BindingSet _bindingSet = new BindingSet();
 
-    /**
-     * Maps from queue name to queue instances
-     */
 
     @ManagedObjectFactoryConstructor
     public FanoutExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -237,32 +187,47 @@ class FanoutExchangeImpl extends AbstractExchange<FanoutExchangeImpl> implements
                                                                                         final RoutingResult<M> result)
     {
         BindingSet bindingSet = _bindingSet;
-        for(MessageDestination destination : bindingSet._unfilteredQueues)
+
+        if (!bindingSet._unfilteredDestinations.isEmpty())
         {
-            result.add(destination.route(message, routingAddress, instanceProperties));
+            for (MessageDestination destination : bindingSet._unfilteredDestinations.keySet())
+            {
+                Set<String> replacementRoutingKeys =
+                        new HashSet<>(bindingSet._unfilteredDestinations.get(destination).values());
+
+                replacementRoutingKeys.forEach(
+                        replacementRoutingKey -> result.add(destination.route(message,
+                                                                              replacementRoutingKey == null
+                                                                                      ? routingAddress
+                                                                                      : replacementRoutingKey,
+                                                                              instanceProperties)));
+            }
         }
-        final Map<MessageDestination, Map<BindingIdentifier, FilterManager>> filteredBindings = bindingSet._filteredBindings;
-        if(!bindingSet._filteredQueues.isEmpty())
+
+        final Map<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>>
+                filteredDestinations = bindingSet._filteredDestinations;
+        if (!filteredDestinations.isEmpty())
         {
-            for(MessageDestination q : bindingSet._filteredQueues)
+            for (Map.Entry<MessageDestination, Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple>> entry :
+                    filteredDestinations.entrySet())
             {
-                final Map<BindingIdentifier, FilterManager> bindingMessageFilterMap = filteredBindings.get(q);
-                if(!(bindingMessageFilterMap == null || bindingSet._unfilteredQueues.contains(q)))
+                MessageDestination destination = entry.getKey();
+                final Map<BindingIdentifier, FilterManagerReplacementRoutingKeyTuple> bindingMessageFilterMap =
+                        entry.getValue();
+                for (FilterManagerReplacementRoutingKeyTuple tuple : bindingMessageFilterMap.values())
                 {
-                    for(FilterManager filter : bindingMessageFilterMap.values())
+
+                    FilterManager filter = tuple.getFilterManager();
+                    if (filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
                     {
-                        if(filter.allAllow(Filterable.Factory.newInstance(message, instanceProperties)))
-                        {
-                            result.add(q.route(message, routingAddress, instanceProperties));
-                            break;
-                        }
+                        String routingKey = tuple.getReplacementRoutingKey() == null
+                                ? routingAddress
+                                : tuple.getReplacementRoutingKey();
+                        result.add(destination.route(message, routingKey, instanceProperties));
                     }
                 }
             }
-
         }
-
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
new file mode 100644
index 0000000..3047755
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterManagerReplacementRoutingKeyTuple.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.filter.FilterManager;
+
+final class FilterManagerReplacementRoutingKeyTuple
+{
+    private final FilterManager _filterManager;
+    private final String _replacementRoutingKey; // Nullable
+
+    FilterManagerReplacementRoutingKeyTuple(final FilterManager filterManager,
+                                            final String replacementRoutingKey)
+    {
+        _filterManager = filterManager;
+        _replacementRoutingKey = replacementRoutingKey;
+    }
+
+    FilterManager getFilterManager()
+    {
+        return _filterManager;
+    }
+
+    String getReplacementRoutingKey()
+    {
+        return _replacementRoutingKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index aba2def..c14e03e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -35,7 +35,7 @@ import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Binding;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -46,8 +46,9 @@ class HeadersBinding
 
     private final Map<String,Object> _mappings;
     private final AbstractExchange.BindingIdentifier _binding;
-    private final Set<String> required = new HashSet<String>();
-    private final Map<String,Object> matches = new HashMap<String,Object>();
+    private final Set<String> required = new HashSet<>();
+    private final Map<String,Object> matches = new HashMap<>();
+    private final String _replacementRoutingKey;
     private boolean matchAny;
     private FilterManager _filter;
 
@@ -62,15 +63,18 @@ class HeadersBinding
     public HeadersBinding(AbstractExchange.BindingIdentifier binding, Map<String,Object> arguments)
     {
         _binding = binding;
-        if(_binding !=null)
+        arguments = arguments == null ? Collections.emptyMap() : arguments;
+        if(_binding != null)
         {
-            _mappings = arguments == null ? Collections.<String,Object>emptyMap() : arguments;
+            _mappings = arguments;
             initMappings();
         }
         else
         {
             _mappings = null;
         }
+        Object key = arguments.get(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+        _replacementRoutingKey = key == null ? null : String.valueOf(key);
     }
 
     private void initMappings()
@@ -79,7 +83,7 @@ class HeadersBinding
         {
             try
             {
-                _filter = FilterSupport.createMessageFilter(_mappings, (Queue<?>) _binding.getDestination());
+                _filter = FilterSupport.createMessageFilter(_mappings, _binding.getDestination());
             }
             catch (AMQInvalidArgumentException e)
             {
@@ -278,6 +282,11 @@ class HeadersBinding
         return _binding == null ? 0 : _binding.hashCode();
     }
 
+    public String getReplacementRoutingKey()
+    {
+        return _replacementRoutingKey;
+    }
+
     private static class ExcludeAllFilter implements MessageFilter
     {
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
index 6ebd1b9..65eabb3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeImpl.java
@@ -21,22 +21,19 @@
 package org.apache.qpid.server.exchange;
 
 import java.util.Collections;
-import java.util.ListIterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
@@ -72,11 +69,7 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
 
     private static final Logger _logger = LoggerFactory.getLogger(HeadersExchangeImpl.class);
 
-    private final ConcurrentMap<String, CopyOnWriteArraySet<BindingIdentifier>> _bindingsByKey =
-                            new ConcurrentHashMap<>();
-
-    private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
-                            new CopyOnWriteArrayList<HeadersBinding>();
+    private final Set<HeadersBinding> _bindingHeaderMatchers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     @ManagedObjectFactoryConstructor
     public HeadersExchangeImpl(final Map<String, Object> attributes, final QueueManagingVirtualHost<?> vhost)
@@ -86,7 +79,7 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
 
     @Override
     public <M extends ServerMessage<? extends StorableMessageMetaData>> void doRoute(M payload,
-                                                                                     final String routingKey,
+                                                                                     String routingKey,
                                                                                      final InstanceProperties instanceProperties,
                                                                                      RoutingResult<M> routingResult)
     {
@@ -96,15 +89,17 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
         {
             if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
             {
-                BindingIdentifier b = hb.getBinding();
-
+                MessageDestination destination = hb.getBinding().getDestination();
 
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Exchange " + getName() + ": delivering message with headers " +
-                                  payload.getMessageHeader() + " to " + b.getDestination().getName());
+                    _logger.debug("Exchange '{}' delivering message with headers '{}' to '{}'",
+                                  getName(), payload.getMessageHeader(), destination.getName());
                 }
-                routingResult.add(b.getDestination().route(payload, routingKey, instanceProperties));
+                String actualRoutingKey = hb.getReplacementRoutingKey() == null
+                        ? routingKey
+                        : hb.getReplacementRoutingKey();
+                routingResult.add(destination.route(payload, actualRoutingKey, instanceProperties));
             }
         }
     }
@@ -113,61 +108,19 @@ public class HeadersExchangeImpl extends AbstractExchange<HeadersExchangeImpl> i
     @Override
     protected void onBind(final BindingIdentifier binding, Map<String,Object> arguments)
     {
-        String bindingKey = binding.getBindingKey();
-        Queue<?> queue = (Queue<?>) binding.getDestination();
-
-        CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(bindingKey);
-
-        if(bindings == null)
-        {
-            bindings = new CopyOnWriteArraySet<>();
-            CopyOnWriteArraySet<BindingIdentifier> newBindings;
-            if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
-            {
-                bindings = newBindings;
-            }
-        }
-
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
-                          " with binding key '" +bindingKey + "' and args: " + arguments);
-        }
-
         _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
-        bindings.add(binding);
-
     }
 
     @Override
     protected void onBindingUpdated(final BindingIdentifier binding, final Map<String, Object> arguments)
     {
-        HeadersBinding headersBinding = new HeadersBinding(binding, arguments);
-        ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
-        while(iter.hasNext())
-        {
-            if(iter.next().equals(headersBinding))
-            {
-                iter.set(headersBinding);
-            }
-        }
-
+        _bindingHeaderMatchers.add(new HeadersBinding(binding, arguments));
     }
 
     @Override
     protected void onUnbind(final BindingIdentifier binding)
     {
-        assert binding != null;
-
-        CopyOnWriteArraySet<BindingIdentifier> bindings = _bindingsByKey.get(binding.getBindingKey());
-        if(bindings != null)
-        {
-            bindings.remove(binding);
-        }
-
-        boolean removedBinding = _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.<String,Object>emptyMap()));
-        _logger.debug("Removing Binding: {}", removedBinding);
-
+        _bindingHeaderMatchers.remove(new HeadersBinding(binding, Collections.emptyMap()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
index 4a30d8e..66933ec 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeImpl.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
@@ -42,7 +43,6 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -53,8 +53,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
 
     private final TopicParser _parser = new TopicParser();
 
-    private final Map<String, TopicExchangeResult> _topicExchangeResults =
-            new ConcurrentHashMap<String, TopicExchangeResult>();
+    private final Map<String, TopicExchangeResult> _topicExchangeResults = new ConcurrentHashMap<>();
 
     private final Map<BindingIdentifier, Map<String,Object>> _bindings = new HashMap<>();
 
@@ -72,115 +71,48 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
 
         _logger.debug("Updating binding of queue {} with routing key {}", destination.getName(), bindingKey);
 
-
         String routingKey = TopicNormalizer.normalize(bindingKey);
 
         try
         {
-
             if (_bindings.containsKey(binding))
             {
-                Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
                 TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
-                if (FilterSupport.argumentsContainFilter(newArguments))
-                {
-                    if (FilterSupport.argumentsContainFilter(oldArgs))
-                    {
-                        result.replaceQueueFilter(destination,
-                                                  FilterSupport.createMessageFilter(oldArgs, destination),
-                                                  FilterSupport.createMessageFilter(newArguments, destination));
-                    }
-                    else
-                    {
-                        result.addFilteredQueue(destination, FilterSupport.createMessageFilter(newArguments, destination));
-                        result.removeUnfilteredQueue(destination);
-                    }
-                }
-                else
-                {
-                    if (FilterSupport.argumentsContainFilter(oldArgs))
-                    {
-                        result.addUnfilteredQueue(destination);
-                        result.removeFilteredQueue(destination, FilterSupport.createMessageFilter(oldArgs, destination));
-                    }
-                    else
-                    {
-                        // TODO - fix control flow
-                        return;
-                    }
-                }
-
+                updateTopicExchangeResult(result, binding, newArguments);
             }
         }
         catch (AMQInvalidArgumentException e)
         {
             throw new ConnectionScopedRuntimeException(e);
         }
-
-
     }
 
-    protected synchronized void registerQueue(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
+    private synchronized void bind(final BindingIdentifier binding, Map<String,Object> arguments) throws AMQInvalidArgumentException
     {
         final String bindingKey = binding.getBindingKey();
-        Queue<?> queue = (Queue<?>) binding.getDestination();
-
-        _logger.debug("Registering queue {} with routing key {}", queue.getName(), bindingKey);
+        MessageDestination messageDestination = binding.getDestination();
 
+        _logger.debug("Registering messageDestination {} with routing key {}", messageDestination.getName(), bindingKey);
 
         String routingKey = TopicNormalizer.normalize(bindingKey);
+        TopicExchangeResult result = _topicExchangeResults.get(routingKey);
 
         if(_bindings.containsKey(binding))
         {
-            Map<String,Object> oldArgs = _bindings.put(binding, arguments);
-            TopicExchangeResult result = _topicExchangeResults.get(routingKey);
-
-            if(FilterSupport.argumentsContainFilter(arguments))
-            {
-                if(FilterSupport.argumentsContainFilter(oldArgs))
-                {
-                    result.replaceQueueFilter(queue,
-                                              FilterSupport.createMessageFilter(oldArgs, queue),
-                                              FilterSupport.createMessageFilter(arguments, queue));
-                }
-                else
-                {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
-                    result.removeUnfilteredQueue(queue);
-                }
-            }
-            else
-            {
-                if(FilterSupport.argumentsContainFilter(oldArgs))
-                {
-                    result.addUnfilteredQueue(queue);
-                    result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
-                }
-                else
-                {
-                    // TODO - fix control flow
-                    return;
-                }
-            }
-
-            result.addBinding(binding);
-
+            updateTopicExchangeResult(result, binding, arguments);
         }
         else
         {
-
-            TopicExchangeResult result = _topicExchangeResults.get(routingKey);
             if(result == null)
             {
                 result = new TopicExchangeResult();
                 if(FilterSupport.argumentsContainFilter(arguments))
                 {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
+                    result.addFilteredDestination(messageDestination, FilterSupport.createMessageFilter(arguments, messageDestination));
                 }
                 else
                 {
-                    result.addUnfilteredQueue(queue);
+                    result.addUnfilteredDestination(messageDestination);
                 }
                 _parser.addBinding(routingKey, result);
                 _topicExchangeResults.put(routingKey,result);
@@ -189,18 +121,17 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
             {
                 if(FilterSupport.argumentsContainFilter(arguments))
                 {
-                    result.addFilteredQueue(queue, FilterSupport.createMessageFilter(arguments, queue));
+                    result.addFilteredDestination(messageDestination, FilterSupport.createMessageFilter(arguments, messageDestination));
                 }
                 else
                 {
-                    result.addUnfilteredQueue(queue);
+                    result.addUnfilteredDestination(messageDestination);
                 }
             }
 
-            result.addBinding(binding);
             _bindings.put(binding, arguments);
+            result.addBinding(binding, arguments);
         }
-
     }
 
     @Override
@@ -213,17 +144,20 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
                 ? ""
                 : routingAddress;
 
-        final Collection<MessageDestination> matchedQueues =
-                getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
+        final Map<MessageDestination, Set<String>> matchedDestinations =
+                getMatchedDestinations(Filterable.Factory.newInstance(payload, instanceProperties), routingKey);
 
-        for(MessageDestination queue : matchedQueues)
+        for(Map.Entry<MessageDestination, Set<String>> entry : matchedDestinations.entrySet())
         {
-            result.add(queue.route(payload, routingAddress, instanceProperties));
+            MessageDestination destination = entry.getKey();
+            Set<String> replacementKeys = entry.getValue();
+            replacementKeys.forEach(replacementKey -> result.add(destination.route(payload, replacementKey == null ? routingAddress : replacementKey, instanceProperties)));
+
         }
     }
 
 
-    private synchronized boolean deregisterQueue(final BindingIdentifier binding)
+    private synchronized boolean unbind(final BindingIdentifier binding)
     {
         if(_bindings.containsKey(binding))
         {
@@ -240,8 +174,9 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
             {
                 try
                 {
-                    result.removeFilteredQueue((Queue<?>) binding.getDestination(), FilterSupport.createMessageFilter(bindingArgs,
-                                                                                                                      (Queue<?>) binding.getDestination()));
+                    result.removeFilteredDestination(binding.getDestination(),
+                                                     FilterSupport.createMessageFilter(bindingArgs,
+                                                                                       binding.getDestination()));
                 }
                 catch (AMQInvalidArgumentException e)
                 {
@@ -250,8 +185,10 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
             }
             else
             {
-                result.removeUnfilteredQueue((Queue<?>) binding.getDestination());
+                result.removeUnfilteredDestination(binding.getDestination());
             }
+
+            // shall we delete the result from _topicExchangeResults if result is empty?
             return true;
         }
         else
@@ -260,30 +197,37 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
         }
     }
 
-    private Collection<MessageDestination> getMatchedQueues(Filterable message, String routingKey)
+    private Map<MessageDestination, Set<String>> getMatchedDestinations(Filterable message, String routingKey)
     {
-
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);
-        switch(results.size())
+        if (!results.isEmpty())
         {
-            case 0:
-                return Collections.EMPTY_SET;
-            case 1:
-                TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
-                results.toArray(resultQueues);
-                return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
-            default:
-                Collection<MessageDestination> queues = new HashSet<>();
-                for(TopicMatcherResult result : results)
+            Map<MessageDestination, Set<String>> matchedDestinations = new HashMap<>();
+            for (TopicMatcherResult result : results)
+            {
+                TopicExchangeResult topicExchangeResult = (TopicExchangeResult) result;
+                Map<MessageDestination, String> destinations = topicExchangeResult.processMessage(message);
+                if (!destinations.isEmpty())
                 {
-                    TopicExchangeResult res = (TopicExchangeResult)result;
-
-                    queues = res.processMessage(message, queues);
+                    destinations.forEach((destination, replacementKey) ->
+                                 {
+                                     Set<String> currentKeys = matchedDestinations.get(destination);
+                                     if (currentKeys == null)
+                                     {
+                                         matchedDestinations.put(destination, Collections.singleton(replacementKey));
+                                     }
+                                     else if (!currentKeys.contains(replacementKey))
+                                     {
+                                         Set<String> newKeys = new HashSet<>(currentKeys);
+                                         newKeys.add(replacementKey);
+                                         matchedDestinations.put(destination, newKeys);
+                                     }
+                                 });
                 }
-                return queues;
+            }
+            return matchedDestinations;
         }
-
-
+        return Collections.emptyMap();
     }
 
     @Override
@@ -291,7 +235,7 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
     {
         try
         {
-            registerQueue(binding, arguments);
+            bind(binding, arguments);
         }
         catch (AMQInvalidArgumentException e)
         {
@@ -303,7 +247,36 @@ class TopicExchangeImpl extends AbstractExchange<TopicExchangeImpl> implements T
     @Override
     protected void onUnbind(final BindingIdentifier binding)
     {
-        deregisterQueue(binding);
+        unbind(binding);
+    }
+
+    private void updateTopicExchangeResult(final TopicExchangeResult result, final BindingIdentifier binding,
+                                           final Map<String, Object> newArguments)
+            throws AMQInvalidArgumentException
+    {
+        Map<String, Object> oldArgs = _bindings.put(binding, newArguments);
+        MessageDestination destination = binding.getDestination();
+
+        if (FilterSupport.argumentsContainFilter(newArguments))
+        {
+            if (FilterSupport.argumentsContainFilter(oldArgs))
+            {
+                result.replaceDestinationFilter(destination,
+                                                FilterSupport.createMessageFilter(oldArgs, destination),
+                                                FilterSupport.createMessageFilter(newArguments, destination));
+            }
+            else
+            {
+                result.addFilteredDestination(destination, FilterSupport.createMessageFilter(newArguments, destination));
+                result.removeUnfilteredDestination(destination);
+            }
+        }
+        else if (FilterSupport.argumentsContainFilter(oldArgs))
+        {
+            result.addUnfilteredDestination(destination);
+            result.removeFilteredDestination(destination, FilterSupport.createMessageFilter(oldArgs, destination));
+        }
+        result.addBinding(binding, newArguments);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index eb9bcaf..86b1f23 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -20,105 +20,69 @@
  */
 package org.apache.qpid.server.exchange.topic;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.server.exchange.AbstractExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.Binding;
 
 public final class TopicExchangeResult implements TopicMatcherResult
 {
-    private final List<AbstractExchange.BindingIdentifier> _bindings = new CopyOnWriteArrayList<>();
-    private final Map<MessageDestination, Integer> _unfilteredQueues = new ConcurrentHashMap<>();
-    private final ConcurrentMap<MessageDestination, Map<FilterManager,Integer>> _filteredQueues = new ConcurrentHashMap<>();
-    private volatile ArrayList<MessageDestination> _unfilteredQueueList = new ArrayList<>(0);
+    private final Map<MessageDestination, Integer> _unfilteredDestinations = new ConcurrentHashMap<>();
+    private final ConcurrentMap<MessageDestination, Map<FilterManager,Integer>> _filteredDestinations = new ConcurrentHashMap<>();
+    private final Map<MessageDestination, String> _replacementKeys = new ConcurrentHashMap<>();
 
-    public void addUnfilteredQueue(MessageDestination queue)
+    public void addUnfilteredDestination(MessageDestination destination)
     {
-        Integer instances = _unfilteredQueues.get(queue);
-        if(instances == null)
+        _unfilteredDestinations.merge(destination, 1, (oldCount, increment) -> oldCount + increment);
+    }
+
+    public void removeUnfilteredDestination(MessageDestination destination)
+    {
+        Integer instances = _unfilteredDestinations.get(destination);
+        if(instances == 1)
         {
-            _unfilteredQueues.put(queue, 1);
-            ArrayList<MessageDestination> newList = new ArrayList<>(_unfilteredQueueList);
-            newList.add(queue);
-            _unfilteredQueueList = newList;
+            _unfilteredDestinations.remove(destination);
         }
         else
         {
-            _unfilteredQueues.put(queue, instances + 1);
+            _unfilteredDestinations.put(destination, instances - 1);
         }
     }
 
-    public void removeUnfilteredQueue(MessageDestination queue)
+    public void addBinding(AbstractExchange.BindingIdentifier binding, Map<String, Object> bindingArguments)
     {
-        Integer instances = _unfilteredQueues.get(queue);
-        if(instances == 1)
+        Object keyObject = bindingArguments.get(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY);
+        if (keyObject == null)
         {
-            _unfilteredQueues.remove(queue);
-            ArrayList<MessageDestination> newList = new ArrayList<>(_unfilteredQueueList);
-            newList.remove(queue);
-            _unfilteredQueueList = newList;
-
+            _replacementKeys.remove(binding.getDestination());
         }
         else
         {
-            _unfilteredQueues.put(queue,instances - 1);
+            _replacementKeys.put(binding.getDestination(), String.valueOf(keyObject));
         }
-
     }
 
-    public Collection<MessageDestination> getUnfilteredQueues()
-    {
-        return _unfilteredQueues.keySet();
-    }
-
-    public void addBinding(AbstractExchange.BindingIdentifier binding)
-    {
-        _bindings.add(binding);
-    }
-    
     public void removeBinding(AbstractExchange.BindingIdentifier binding)
     {
-        _bindings.remove(binding);
-    }
-    
-    public List<AbstractExchange.BindingIdentifier> getBindings()
-    {
-        return new ArrayList<>(_bindings);
+        _replacementKeys.remove(binding.getDestination());
     }
 
-    public void addFilteredQueue(MessageDestination queue, FilterManager filter)
+    public void addFilteredDestination(MessageDestination destination, FilterManager filter)
     {
-        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
-        if(filters == null)
-        {
-            filters = new ConcurrentHashMap<>();
-            _filteredQueues.put(queue, filters);
-        }
-        Integer instances = filters.get(filter);
-        if(instances == null)
-        {
-            filters.put(filter,1);
-        }
-        else
-        {
-            filters.put(filter, instances + 1);
-        }
-
+        Map<FilterManager, Integer> filters =
+                _filteredDestinations.computeIfAbsent(destination, filterManagerMap -> new ConcurrentHashMap<>());
+        filters.merge(filter, 1, (oldCount, increment) -> oldCount + increment);
     }
 
-    public void removeFilteredQueue(MessageDestination queue, FilterManager filter)
+    public void removeFilteredDestination(MessageDestination destination, FilterManager filter)
     {
-        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+        Map<FilterManager,Integer> filters = _filteredDestinations.get(destination);
         if(filters != null)
         {
             Integer instances = filters.get(filter);
@@ -129,7 +93,7 @@ public final class TopicExchangeResult implements TopicMatcherResult
                     filters.remove(filter);
                     if(filters.isEmpty())
                     {
-                        _filteredQueues.remove(queue);
+                        _filteredDestinations.remove(destination);
                     }
                 }
                 else
@@ -142,11 +106,11 @@ public final class TopicExchangeResult implements TopicMatcherResult
 
     }
 
-    public void replaceQueueFilter(MessageDestination queue,
-                                   FilterManager oldFilter,
-                                   FilterManager newFilter)
+    public void replaceDestinationFilter(MessageDestination queue,
+                                         FilterManager oldFilter,
+                                         FilterManager newFilter)
     {
-        Map<FilterManager,Integer> filters = _filteredQueues.get(queue);
+        Map<FilterManager,Integer> filters = _filteredDestinations.get(queue);
         Map<FilterManager,Integer> newFilters = new ConcurrentHashMap<>(filters);
         Integer oldFilterInstances = filters.get(oldFilter);
         if(oldFilterInstances == 1)
@@ -166,45 +130,35 @@ public final class TopicExchangeResult implements TopicMatcherResult
         {
             newFilters.put(newFilter, newFilterInstances+1);
         }
-        _filteredQueues.put(queue,newFilters);
+        _filteredDestinations.put(queue, newFilters);
     }
 
-    public Collection<MessageDestination> processMessage(Filterable msg, Collection<MessageDestination> queues)
+    public Map<MessageDestination, String> processMessage(Filterable msg)
     {
-        if(queues == null)
-        {
-            if(_filteredQueues.isEmpty())
-            {
-                return _unfilteredQueueList;
-            }
-            else
-            {
-                queues = new HashSet<>();
-            }
-        }
-        else if(!(queues instanceof Set))
+        Map<MessageDestination, String> result = new HashMap<>();
+        for(MessageDestination unfilteredDestination: _unfilteredDestinations.keySet())
         {
-            queues = new HashSet<>(queues);
+            result.put(unfilteredDestination, _replacementKeys.get(unfilteredDestination));
         }
 
-        queues.addAll(_unfilteredQueues.keySet());
-        if(!_filteredQueues.isEmpty())
+        if(!_filteredDestinations.isEmpty())
         {
-            for(Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredQueues.entrySet())
+            for(Map.Entry<MessageDestination, Map<FilterManager, Integer>> entry : _filteredDestinations.entrySet())
             {
-                if(!queues.contains(entry.getKey()))
+                MessageDestination destination = entry.getKey();
+                if(!_unfilteredDestinations.containsKey(destination))
                 {
                     for(FilterManager filter : entry.getValue().keySet())
                     {
                         if(filter.allAllow(msg))
                         {
-                            queues.add(entry.getKey());
+                            result.put(destination, _replacementKeys.get(destination));
                         }
                     }
                 }
             }
         }
-        return queues;
+        return result;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
index 214ca23..bfe6fbe 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
@@ -35,7 +35,7 @@ public class TopicParser
     private static final String TOPIC_DELIMITER = "\\.";
 
     private final TopicWordDictionary _dictionary = new TopicWordDictionary();
-    private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<TopicMatcherDFAState>();
+    private final AtomicReference<TopicMatcherDFAState> _stateMachine = new AtomicReference<>();
 
     private static class Position
     {
@@ -46,7 +46,7 @@ public class TopicParser
         private boolean _followedByAnyLoop;
 
 
-        public Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
+        private Position(final int position, final TopicWord word, final boolean selfTransition, final boolean endState)
         {
             _position = position;
             _word = word;
@@ -55,32 +55,32 @@ public class TopicParser
         }
 
 
-        public TopicWord getWord()
+        private TopicWord getWord()
         {
             return _word;
         }
 
-        public boolean isSelfTransition()
+        private boolean isSelfTransition()
         {
             return _selfTransition;
         }
 
-        public int getPosition()
+        private int getPosition()
         {
             return _position;
         }
 
-        public boolean isEndState()
+        private boolean isEndState()
         {
             return _endState;
         }
 
-        public boolean isFollowedByAnyLoop()
+        private boolean isFollowedByAnyLoop()
         {
             return _followedByAnyLoop;
         }
 
-        public void setFollowedByAnyLoop(boolean followedByAnyLoop)
+        private void setFollowedByAnyLoop(boolean followedByAnyLoop)
         {
             _followedByAnyLoop = followedByAnyLoop;
         }
@@ -123,7 +123,7 @@ public class TopicParser
         TopicMatcherDFAState stateMachine = _stateMachine.get();
         if(stateMachine == null)
         {
-            return Collections.EMPTY_SET;
+            return Collections.emptySet();
         }
         else
         {
@@ -132,7 +132,7 @@ public class TopicParser
     }
 
 
-    TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result)
+    private TopicMatcherDFAState createStateMachine(String bindingKey, TopicMatcherResult result)
     {
         List<TopicWord> wordList = createTopicWordList(bindingKey);
         int wildCards = 0;
@@ -146,17 +146,17 @@ public class TopicParser
         if(wildCards == 0)
         {
             TopicMatcherDFAState[] states = new TopicMatcherDFAState[wordList.size()+1];
-            states[states.length-1] = new TopicMatcherDFAState(Collections.EMPTY_MAP, Collections.singleton(result));
+            states[states.length-1] = new TopicMatcherDFAState(Collections.emptyMap(), Collections.singleton(result));
             for(int i = states.length-2; i >= 0; i--)
             {
-                states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.EMPTY_SET);
+                states[i] = new TopicMatcherDFAState(Collections.singletonMap(wordList.get(i),states[i+1]),Collections.emptySet());
 
             }
             return states[0];
         }
         else if(wildCards == wordList.size())
         {
-            Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<TopicWord,TopicMatcherDFAState>();
+            Map<TopicWord,TopicMatcherDFAState> stateMap = new HashMap<>();
             TopicMatcherDFAState state = new TopicMatcherDFAState(stateMap, Collections.singleton(result));
             stateMap.put(TopicWord.ANY_WORD, state);
             return state;
@@ -232,7 +232,7 @@ public class TopicParser
         // we approach this by examining steps of increasing length - so we
         // look how far we can go from the start position in 1 word, 2 words, etc...
 
-        Map<Set<Position>,SimpleState> stateMap = new HashMap<Set<Position>,SimpleState>();
+        Map<Set<Position>,SimpleState> stateMap = new HashMap<>();
 
 
         SimpleState state = new SimpleState();
@@ -243,7 +243,7 @@ public class TopicParser
 
         SimpleState[] simpleStates = stateMap.values().toArray(new SimpleState[stateMap.size()]);
         HashMap<TopicWord, TopicMatcherDFAState>[] dfaStateMaps = new HashMap[simpleStates.length];
-        Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<SimpleState, TopicMatcherDFAState>();
+        Map<SimpleState, TopicMatcherDFAState> simple2DFAMap = new HashMap<>();
 
         for(int i = 0; i < simpleStates.length; i++)
         {
@@ -266,10 +266,10 @@ public class TopicParser
             }
             else
             {
-                results = Collections.EMPTY_SET;
+                results = Collections.emptySet();
             }
 
-            dfaStateMaps[i] = new HashMap<TopicWord, TopicMatcherDFAState>();
+            dfaStateMaps[i] = new HashMap<>();
             simple2DFAMap.put(simpleStates[i], new TopicMatcherDFAState(dfaStateMaps[i],results));
 
         }
@@ -295,7 +295,7 @@ public class TopicParser
                                      final Map<Set<Position>, SimpleState> stateMap,
                                      final Position[] positions)
     {
-        Map<TopicWord, Set<Position>> transitions = new HashMap<TopicWord,Set<Position>>();
+        Map<TopicWord, Set<Position>> transitions = new HashMap<>();
 
         for(Position pos : state._positions)
         {
@@ -304,7 +304,7 @@ public class TopicParser
                 Set<Position> dest = transitions.get(TopicWord.ANY_WORD);
                 if(dest == null)
                 {
-                    dest = new HashSet<Position>();
+                    dest = new HashSet<>();
                     transitions.put(TopicWord.ANY_WORD,dest);
                 }
                 dest.add(pos);
@@ -316,7 +316,7 @@ public class TopicParser
             Set<Position> dest = transitions.get(pos.getWord());
             if(dest == null)
             {
-                dest = new HashSet<Position>();
+                dest = new HashSet<>();
                 transitions.put(pos.getWord(),dest);
             }
             dest.add(nextPosition);
@@ -332,7 +332,7 @@ public class TopicParser
             }
         }
 
-        state._nextState = new HashMap<TopicWord, SimpleState>();
+        state._nextState = new HashMap<>();
 
         for(Map.Entry<TopicWord,Set<Position>> dest : transitions.entrySet())
         {
@@ -370,7 +370,7 @@ public class TopicParser
                 }
                 if(anyLoop != null)
                 {
-                    Collection<Position> removals = new ArrayList<Position>();
+                    Collection<Position> removals = new ArrayList<>();
                     for(Position destPos : dest.getValue())
                     {
                         if(destPos.getPosition() < anyLoop.getPosition())
@@ -402,7 +402,7 @@ public class TopicParser
         SimpleState anyWordState = state._nextState.get(TopicWord.ANY_WORD);
         if(anyWordState != null)
         {
-            List<TopicWord> removeList = new ArrayList<TopicWord>();
+            List<TopicWord> removeList = new ArrayList<>();
             for(Map.Entry<TopicWord,SimpleState> entry : state._nextState.entrySet())
             {
                 if(entry.getValue() == anyWordState && entry.getKey() != TopicWord.ANY_WORD)
@@ -424,7 +424,7 @@ public class TopicParser
         String[] tokens = bindingKey.split(TOPIC_DELIMITER);
         TopicWord previousWord = null;
 
-        List<TopicWord> wordList = new ArrayList<TopicWord>();
+        List<TopicWord> wordList = new ArrayList<>();
 
         for(String token : tokens)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index bb18bcc..7f0979e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -177,6 +178,16 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
         return refusalMessages.toString();
     }
 
+    public int getNumberOfRoutes()
+    {
+        return _queues.size();
+    }
+
+    public Collection<BaseQueue> getRoutes()
+    {
+        return Collections.unmodifiableCollection(_queues);
+    }
+
     private static class RejectReason
     {
         private final RejectType _rejectType;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
index 5c1a2d2..8f6089f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -25,6 +25,7 @@ import java.util.Map;
 @ManagedAttributeValueType
 public interface Binding extends PublishingLink
 {
+    String BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY = "x-replacement-routing-key";
 
     String TYPE = "binding";
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 2b5614b..3dd62b5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -104,7 +104,8 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
                  @Param(name = "arguments", defaultValue = "{}") Map<String, Object> arguments,
                  @Param(name = "replaceExistingArguments", defaultValue = "false") boolean replaceExistingArguments);
 
-    @ManagedOperation(changesConfiguredObjectState = true)
+    @ManagedOperation(changesConfiguredObjectState = true,
+                      description = "Deletes all the bindings matching the given destination and bindingKey")
     boolean unbind(@Param(name="destination", mandatory = true) String destination,
                    @Param(name="bindingKey") String bindingKey);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'

Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 73a4597..b39e959 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -19,25 +19,38 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.RoutingResult;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class DirectExchangeTest extends QpidTestCase
 {
-    private DirectExchangeImpl _exchange;
+    private DirectExchange<?> _exchange;
     private VirtualHost<?> _vhost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
     public void setUp() throws Exception
@@ -51,8 +64,10 @@ public class DirectExchangeTest extends QpidTestCase
         attributes.put(Exchange.DURABLE, false);
         attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        _exchange = (DirectExchangeImpl) _vhost.createChild(Exchange.class, attributes);
-        _exchange.open();
+        _exchange = (DirectExchange<?>) _vhost.createChild(Exchange.class, attributes);
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
     }
 
     @Override
@@ -128,7 +143,7 @@ public class DirectExchangeTest extends QpidTestCase
         attributes.put(Queue.DURABLE, false);
         attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
 
-        Queue queue = (Queue) _vhost.createChild(Queue.class, attributes);
+        Queue queue = _vhost.createChild(Queue.class, attributes);
         queue.open();
 
         assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateBindingDestination());
@@ -221,4 +236,197 @@ public class DirectExchangeTest extends QpidTestCase
                      _exchange.getName(),
                      newExchange.getAlternateBinding().getDestination());
     }
+
+    public void testRouteToQueue()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey,
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue before bind", result.hasRoutes());
+
+        boolean bind = _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation should be successful", bind);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue after bind", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), boundKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue after unbind", result.hasRoutes());
+    }
+
+    public void testRouteToQueueWithSelector()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+        ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
+
+        boolean bind = _exchange.bind(queue.getName(), boundKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, boundKey, instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(unmatchingMessage, boundKey, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), boundKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(matchingMessage, boundKey, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue after unbind", result.hasRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchanges()
+    {
+        String boundKey = "key";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       boundKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
+
+
+    public void testDestinationDeleted()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
+
+        _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+
+        assertTrue(_exchange.isBound(boundKey));
+        assertTrue(_exchange.isBound(boundKey, queue));
+        assertTrue(_exchange.isBound(queue));
+
+        queue.delete();
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
+    }
+
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
+    {
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
+    }
+
+    public void testRouteToMultipleQueues()
+    {
+        String boundKey = "key";
+        Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
+
+        boolean bind1 = _exchange.bind(queue1.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        _exchange.bind(queue2.getName(), boundKey, Collections.singletonMap(JMS_SELECTOR.toString(), "prop is null"), false);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 2, result.getNumberOfRoutes());
+
+        _exchange.unbind(queue1.getName(), boundKey);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        _exchange.unbind(queue2.getName(), boundKey);
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
+    {
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, "via_exchange");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2"),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "key1",
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
+    {
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2");
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                                                                                            "key1",
+                                                                                                            _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 "key1",
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 1d7a3b0..cb4cadf 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -20,342 +20,345 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySet;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class FanoutExchangeTest extends QpidTestCase
 {
-    private FanoutExchangeImpl _exchange;
-    private QueueManagingVirtualHost _virtualHost;
-    private TaskExecutor _taskExecutor;
+    private FanoutExchange<?> _exchange;
+    private VirtualHost<?> _vhost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
-    public void setUp()
+    public void setUp() throws Exception
     {
-        Map<String,Object> attributes = new HashMap<String, Object>();
-        attributes.put(Exchange.ID, UUID.randomUUID());
+        super.setUp();
+
+        BrokerTestHelper.setUp();
+        _vhost = BrokerTestHelper.createVirtualHost(getName());
+
+        Map<String,Object> attributes = new HashMap<>();
         attributes.put(Exchange.NAME, "test");
         attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
-        Broker broker = mock(Broker.class);
-        when(broker.getCategoryClass()).thenReturn(Broker.class);
-        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
-
-        VirtualHostNode virtualHostNode = mock(VirtualHostNode.class);
-        when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
-        when(virtualHostNode.getParent()).thenReturn(broker);
-        when(virtualHostNode.getModel()).thenReturn(BrokerModel.getInstance());
-
-        _taskExecutor = new CurrentThreadTaskExecutor();
-        _taskExecutor.start();
-        _virtualHost = mock(QueueManagingVirtualHost.class);
-
-        when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
-        when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
-        when(_virtualHost.getParent()).thenReturn(virtualHostNode);
-        when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
-        _exchange = new FanoutExchangeImpl(attributes, _virtualHost);
+        _exchange = (FanoutExchange<?>) _vhost.createChild(Exchange.class, attributes);
         _exchange.open();
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
     }
 
     @Override
-
     public void tearDown() throws Exception
     {
-        super.tearDown();
-        _taskExecutor.stop();
+        try
+        {
+            if (_vhost != null)
+            {
+                _vhost.close();
+            }
+        }
+        finally
+        {
+            BrokerTestHelper.tearDown();
+            super.tearDown();
+        }
     }
 
-    public void testIsBoundStringMapAMQQueueWhenQueueIsNull()
+    public void testRouteToQueue() throws Exception
     {
-        assertFalse("calling isBound(AMQShortString,FieldTable,Queue<?>) with null queue should return false",
-                _exchange.isBound((String) null, (Map) null, (Queue<?>) null));
-    }
+        String bindingKey = "mybinding";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-    public void testIsBoundStringAMQQueueWhenQueueIsNull()
-    {
-        assertFalse("calling isBound(AMQShortString,Queue<?>) with null queue should return false",
-                _exchange.isBound((String) null, (Queue<?>) null));
-    }
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null,
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue before bind", result.hasRoutes());
 
-    public void testIsBoundAMQQueueWhenQueueIsNull()
-    {
-        assertFalse("calling isBound(AMQQueue) with null queue should return false", _exchange.isBound((Queue<?>) null));
-    }
+        boolean bind = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation should be successful", bind);
 
-    public void testIsBoundStringMapAMQQueue()
-    {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound("matters", null, queue));
-    }
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertTrue("Message not routed to queue after bind", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), bindingKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue after unbind", result.hasRoutes());
 
-    public void testIsBoundStringAMQQueue()
-    {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound("matters", queue));
     }
 
-    public void testIsBoundAMQQueue()
+    public void testRouteToQueueWithSelector()
     {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound(queue));
-    }
+        String bindingKey = "mybinding";
 
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-    public void testRouteToDestination() throws Exception
-    {
-        List<? extends BaseQueue> result;
-        Queue<?> queue = mockQueue();
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+        ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
 
-        _exchange.addBinding("key", queue, null);
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, null, instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+        result = _exchange.route(unmatchingMessage, null, instanceProperties);
+        assertFalse("Message without matching selector unexpectedly routed to queue", result.hasRoutes());
 
-        _exchange.deleteBinding("key", queue);
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+        boolean unbind = _exchange.unbind(queue.getName(), bindingKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(matchingMessage, null, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue after unbind", result.hasRoutes());
     }
 
-    public void testDestinationRemoved() throws Exception
+    public void testRouteToQueueViaTwoExchanges()
     {
-        List<? extends BaseQueue> result;
-        Queue<?> queue = mockQueue();
+        String bindingKey = "key";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding("key", queue, null);
+        boolean exchToViaBind = _exchange.bind(via.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+        boolean viaToQueueBind = via.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        _exchange.destinationRemoved(queue);
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       null,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
     }
 
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key",
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key1"),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key1", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       null,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
 
-    public void testRoutingWithSelectors() throws Exception
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
     {
-        Queue<?> queue = mockQueue();
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        List<? extends BaseQueue> result;
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding("key2", queue, Collections.<String, Object>singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),"prop = True"));
 
-        result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2");
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
 
-        assertEquals("Expected matching message to be routed to queue", 1, result.size());
-        assertTrue("Expected matching message to be routed to queue", result.contains(queue));
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        assertEquals("Expected non matching message not to be routed to queue", 0, result.size());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                                                                       "key1",
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 "key1",
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
-    public void testMultipleBindings() throws Exception
+    public void testRouteToMultipleQueue()
     {
-        Queue<?> queue1 = mockQueue();
-        Queue<?> queue2 = mockQueue();
+        String bindingKey = "key";
+        Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
 
-        List<? extends BaseQueue> result;
+        boolean bind1 = _exchange.bind(queue1.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-        _exchange.addBinding("key", queue1, null);
-        _exchange.addBinding("key", queue2, null);
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-        result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+        _exchange.bind(queue2.getName(), bindingKey, Collections.singletonMap(JMS_SELECTOR.toString(), "prop is null"), false);
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
-        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 2, result.getNumberOfRoutes());
 
-        _exchange.addBinding("key1", queue2, null);
+        _exchange.unbind(queue1.getName(), bindingKey);
 
-        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
-        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+        _exchange.unbind(queue2.getName(), bindingKey);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
     }
 
-    private Queue<?> bindQueue(final String bindingKey)
+    public void testRouteToQueueBoundMultipleTimesUsingTheSameBindingKey()
     {
-        Queue<?> queue = mockQueue();
+        String bindingKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding(bindingKey, queue, null);
-        return queue;
-    }
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-    private Queue<?> mockQueue()
-    {
-        Queue queue = mock(Queue.class);
-        String name = UUID.randomUUID().toString();
-        when(queue.getName()).thenReturn(name);
-        when(queue.getVirtualHost()).thenReturn(_virtualHost);
-        when(queue.getCategoryClass()).thenReturn(Queue.class);
-        when(queue.getModel()).thenReturn(BrokerModel.getInstance());
-        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
-        when(queue.getTaskExecutor()).thenReturn(taskExecutor);
-        when(queue.getChildExecutor()).thenReturn(taskExecutor);
-        when(queue.getParent()).thenReturn(_virtualHost);
-        when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
-        RoutingResult result = new RoutingResult(null);
-        result.addQueue(queue);
-        when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
-        return queue;
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
+
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
+
+        _exchange.unbind(queue.getName(), bindingKey);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
     }
 
-    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
-                                                    final String routingAddress,
-                                                    final InstanceProperties instanceProperties)
+    public void testRouteToQueueBoundMultipleTimesUsingDifferentBindingKeys()
     {
-        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
-        final List<BaseQueue> resultQueues = new ArrayList<>();
-        result.send(new ServerTransaction()
-        {
-            @Override
-            public long getTransactionStartTime()
-            {
-                return 0;
-            }
+        String bindingKey1 = "key1";
+        String bindingKey2 = "key2";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public long getTransactionUpdateTime()
-            {
-                return 0;
-            }
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey1, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-            @Override
-            public void addPostTransactionAction(final Action postTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            }
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey2, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
 
-            @Override
-            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
 
-            }
+        _exchange.unbind(queue.getName(), bindingKey1);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
-            {
+        _exchange.unbind(queue.getName(), bindingKey2);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
 
-            }
+    public void testRouteToQueueBoundMultipleTimesUsingFilteredAndUnfilteredBindings()
+    {
+        String bindingKey1 = "key1";
+        String bindingKey2 = "key2";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public void enqueue(final TransactionLogResource queue,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.add((BaseQueue) queue);
-            }
+        Map<String, Object> argumentsWithFilter = Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True");
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey1,
+                                       argumentsWithFilter, false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-            @Override
-            public void enqueue(final Collection<? extends BaseQueue> queues,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.addAll(queues);
-            }
+        final ServerMessage<?> messageMatchingSelector =
+                createTestMessage(Collections.singletonMap("prop", true));
+        RoutingResult<ServerMessage<?>> result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void commit()
-            {
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey2, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
 
-            }
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
 
-            @Override
-            public void commit(final Runnable immediatePostTransactionAction)
-            {
+        _exchange.unbind(queue.getName(), bindingKey2);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
 
-            }
+        result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void rollback()
-            {
+        _exchange.unbind(queue.getName(), bindingKey1);
+        result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
 
-            }
+    public void testIsBound()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public boolean isTransactional()
-            {
-                return false;
-            }
-        }, null);
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
 
-        return resultQueues;
+        _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+
+        assertTrue(_exchange.isBound(boundKey));
+        assertTrue(_exchange.isBound(boundKey, queue));
+        assertTrue(_exchange.isBound(queue));
+
+        queue.delete();
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
     }
 
-    private ServerMessage mockMessage(boolean propValue)
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
-        final AMQMessageHeader header = mock(AMQMessageHeader.class);
-        when(header.containsHeader("prop")).thenReturn(true);
-        when(header.getHeader("prop")).thenReturn(propValue);
-        when(header.getHeaderNames()).thenReturn(Collections.singleton("prop"));
-        when(header.containsHeaders(anySet())).then(new Answer<Object>()
-        {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable
-            {
-                final Set names = (Set) invocation.getArguments()[0];
-                return names.size() == 1 && names.contains("select");
-
-            }
-        });
-        final ServerMessage serverMessage = mock(ServerMessage.class);
-        when(serverMessage.getMessageHeader()).thenReturn(header);
-        when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        return serverMessage;
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index ca7f46e..ced6924 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -20,193 +20,84 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySet;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class HeadersExchangeTest extends QpidTestCase
 {
-    private HeadersExchangeImpl _exchange;
-    private QueueManagingVirtualHost _virtualHost;
-    private TaskExecutor _taskExecutor;
-    private ConfiguredObjectFactoryImpl _factory;
+    private HeadersExchange<?> _exchange;
+    private QueueManagingVirtualHost<?> _virtualHost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
 
-        _taskExecutor = new CurrentThreadTaskExecutor();
-        _taskExecutor.start();
-        _virtualHost = mock(QueueManagingVirtualHost.class);
-
-        Broker broker = mock(Broker.class);
-        when(broker.getCategoryClass()).thenReturn(Broker.class);
-        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
-
-        VirtualHostNode virtualHostNode = mock(VirtualHostNode.class);
-        when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
-        when(virtualHostNode.getParent()).thenReturn(broker);
-        when(virtualHostNode.getModel()).thenReturn(BrokerModel.getInstance());
-
-        when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-        when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
-        when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
-
-        _factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
-        when(_virtualHost.getObjectFactory()).thenReturn(_factory);
-        when(_virtualHost.getModel()).thenReturn(_factory.getModel());
-        when(_virtualHost.getParent()).thenReturn(virtualHostNode);
-        Map<String,Object> attributes = new HashMap<String, Object>();
-        attributes.put(Exchange.ID, UUID.randomUUID());
+        _virtualHost = BrokerTestHelper.createVirtualHost("test");
+
+        Map<String,Object> attributes = new HashMap<>();
         attributes.put(Exchange.NAME, "test");
         attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
 
-        _exchange = new HeadersExchangeImpl(attributes, _virtualHost);
+        _exchange = (HeadersExchange) _virtualHost.createChild(Exchange.class, attributes);
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
 
     }
 
     @Override
     public void tearDown() throws Exception
     {
+        if (_virtualHost  != null)
+        {
+            _virtualHost.close();
+        }
         super.tearDown();
-        _taskExecutor.stop();
+
     }
 
-    protected void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception
+    private void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception
     {
-        List<? extends BaseQueue> results = routeToQueues(msg, "", InstanceProperties.EMPTY);
-        List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+        RoutingResult<?> result = _exchange.route(msg, "", InstanceProperties.EMPTY);
+        Collection<BaseQueue> results = result.getRoutes();
+        List<BaseQueue> unexpected = new ArrayList<>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
-        List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+        List<BaseQueue> missing = new ArrayList<>(Arrays.asList(expected));
         missing.removeAll(results);
         assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
-        assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
-    }
-
-    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
-                                                    final String routingAddress,
-                                                    final InstanceProperties instanceProperties)
-    {
-        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
-        final List<BaseQueue> resultQueues = new ArrayList<>();
-        result.send(new ServerTransaction()
-        {
-            @Override
-            public long getTransactionStartTime()
-            {
-                return 0;
-            }
-
-            @Override
-            public long getTransactionUpdateTime()
-            {
-                return 0;
-            }
-
-            @Override
-            public void addPostTransactionAction(final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void enqueue(final TransactionLogResource queue,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.add((BaseQueue) queue);
-            }
-
-            @Override
-            public void enqueue(final Collection<? extends BaseQueue> queues,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.addAll(queues);
-            }
-
-            @Override
-            public void commit()
-            {
-
-            }
-
-            @Override
-            public void commit(final Runnable immediatePostTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void rollback()
-            {
-
-            }
-
-            @Override
-            public boolean isTransactional()
-            {
-                return false;
-            }
-        }, null);
-
-        return resultQueues;
+        assertTrue("Duplicates " + results, results.size()==(new HashSet<>(results)).size());
     }
 
 
@@ -218,7 +109,7 @@ public class HeadersExchangeTest extends QpidTestCase
 
     private Map<String, Object> getArgsMapFromStrings(String... arguments)
     {
-        Map<String, Object> map = new HashMap<String,Object>();
+        Map<String, Object> map = new HashMap<>();
 
         for(String arg : arguments)
         {
@@ -238,33 +129,8 @@ public class HeadersExchangeTest extends QpidTestCase
     private Queue<?> createAndBind(final String name, Map<String, Object> arguments)
             throws Exception
     {
-        Queue<?> q = create(name);
-        bind(name, arguments, q);
-        return q;
-    }
-
-    private void bind(String bindingKey, Map<String, Object> arguments, Queue<?> q)
-    {
-        _exchange.addBinding(bindingKey,q,arguments);
-    }
-
-    private Queue<?> create(String name)
-    {
-        Queue q = mock(Queue.class);
-        when(q.getName()).thenReturn(name);
-        when(q.toString()).thenReturn(name);
-        when(q.getVirtualHost()).thenReturn(_virtualHost);
-        when(q.getParent()).thenReturn(_virtualHost);
-        when(q.getCategoryClass()).thenReturn(Queue.class);
-        when(q.getObjectFactory()).thenReturn(_factory);
-        when(q.getModel()).thenReturn(_factory.getModel());
-        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
-        when(q.getTaskExecutor()).thenReturn(taskExecutor);
-        when(q.getChildExecutor()).thenReturn(taskExecutor);
-        when(_virtualHost.getAttainedQueue(name)).thenReturn(q);
-        final RoutingResult routingResult = new RoutingResult(null);
-        routingResult.addQueue(q);
-        when(q.route(any(ServerMessage.class), anyString(), any(InstanceProperties.class))).thenReturn(routingResult);
+        Queue<?> q = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, name));
+        _exchange.addBinding(name, q, arguments);
         return q;
     }
 
@@ -280,13 +146,13 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q7 = createAndBind("Q7", "F0000", "F0001=Bear");
         Queue<?> q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
-                q1, q2, q3, q4, q5, q6, q7, q8);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+                     q1, q2, q3, q4, q5, q6, q7, q8);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0002")));
 
     }
 
@@ -298,12 +164,12 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
         Queue<?> q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1, q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0002")));
     }
 
     public void testOnUnbind() throws Exception
@@ -312,78 +178,153 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q2 = createAndBind("Q2", "F0000=Aardvark");
         Queue<?> q3 = createAndBind("Q3", "F0001");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0001")), q3);
 
         _exchange.deleteBinding("Q1",q1);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
     }
 
 
     public void testWithSelectors() throws Exception
     {
-        Queue<?> q1 = create("Q1");
-        Queue<?> q2 = create("Q2");
-        bind("q1",getArgsMapFromStrings("F"), q1);
-        bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
-        bind("q2",getArgsMapFromStrings("F=1"), q2);
+        Queue<?> q1 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q1"));
+        Queue<?> q2 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q2"));
+        _exchange.addBinding("q1", q1, getArgsMapFromStrings("F"));
+        _exchange.addBinding("q1select",
+                             q1,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='1'"));
+        _exchange.addBinding("q2", q2, getArgsMapFromStrings("F=1"));
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F")),q1);
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=1")), q1, q2);
+
+        Queue<?> q3 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q3"));
+        _exchange.addBinding("q3select",
+                             q3,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='1'"));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=1")), q1, q2, q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=2")), q1);
+        _exchange.addBinding("q3select2",
+                             q3,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='2'"));
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=2")), q1, q3);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+    }
+
+    public void testRouteToQueueViaTwoExchanges()
+    {
+        String bindingKey = "key";
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
+        Exchange via = _virtualHost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        Queue<?> q3 = create("Q3");
-        bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
-        bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+        boolean exchToViaBind = _exchange.bind(via.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+        boolean viaToQueueBind = via.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
     }
 
-    private ServerMessage mockMessage(final Map<String, Object> headerValues)
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
     {
-        final AMQMessageHeader header = mock(AMQMessageHeader.class);
-        when(header.containsHeader(anyString())).then(new Answer<Boolean>()
-        {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws Throwable
-            {
-                return headerValues.containsKey((String) invocation.getArguments()[0]);
-            }
-        });
-        when(header.getHeader(anyString())).then(new Answer<Object>()
-        {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable
-            {
-                return headerValues.get((String) invocation.getArguments()[0]);
-            }
-        });
-        when(header.getHeaderNames()).thenReturn(headerValues.keySet());
-        when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
-        {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws Throwable
-            {
-                final Set names = (Set) invocation.getArguments()[0];
-                return headerValues.keySet().containsAll(names);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _virtualHost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        String bindingKey = "key";
+        String replacementKey = "key1";
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                        replacementKey),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        Map<String, Object> arguments = getArgsMapFromStrings("prop=true", "prop2=true", "X-match=any");
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, arguments, false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("prop", true));
+        RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
 
-            }
-        });
-        final ServerMessage serverMessage = mock(ServerMessage.class);
-        when(serverMessage.getMessageHeader()).thenReturn(header);
-        when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        return serverMessage;
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
+    {
+        String bindingKey = "key1";
+        String replacementKey = "key2";
+
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
+
+        Exchange via = _virtualHost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, replacementKey);
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result =
+                _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                bindingKey,
+                                _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 bindingKey,
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
-    public static junit.framework.Test suite()
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
-        return new junit.framework.TestSuite(HeadersExchangeTest.class);
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+        headerValues.forEach((key, value) -> when(header.containsHeader(key)).thenReturn(true));
+        when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+        when(header.containsHeaders(any())).then(invocation ->
+                                                 {
+                                                     final Set<String> names =
+                                                             (Set<String>) invocation.getArguments()[0];
+                                                     return headerValues.keySet().containsAll(names);
+                                                 });
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org