You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/09/12 15:19:33 UTC

[2/3] qpid-broker-j git commit: QPID-7771: [Java Broker] Add support for binding argument 'x-replacement-routing-key'

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 73a4597..b39e959 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -19,25 +19,38 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.RoutingResult;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class DirectExchangeTest extends QpidTestCase
 {
-    private DirectExchangeImpl _exchange;
+    private DirectExchange<?> _exchange;
     private VirtualHost<?> _vhost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
     public void setUp() throws Exception
@@ -51,8 +64,10 @@ public class DirectExchangeTest extends QpidTestCase
         attributes.put(Exchange.DURABLE, false);
         attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        _exchange = (DirectExchangeImpl) _vhost.createChild(Exchange.class, attributes);
-        _exchange.open();
+        _exchange = (DirectExchange<?>) _vhost.createChild(Exchange.class, attributes);
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
     }
 
     @Override
@@ -128,7 +143,7 @@ public class DirectExchangeTest extends QpidTestCase
         attributes.put(Queue.DURABLE, false);
         attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
 
-        Queue queue = (Queue) _vhost.createChild(Queue.class, attributes);
+        Queue queue = _vhost.createChild(Queue.class, attributes);
         queue.open();
 
         assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateBindingDestination());
@@ -221,4 +236,197 @@ public class DirectExchangeTest extends QpidTestCase
                      _exchange.getName(),
                      newExchange.getAlternateBinding().getDestination());
     }
+
+    public void testRouteToQueue()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey,
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue before bind", result.hasRoutes());
+
+        boolean bind = _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation should be successful", bind);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue after bind", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), boundKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue after unbind", result.hasRoutes());
+    }
+
+    public void testRouteToQueueWithSelector()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+        ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
+
+        boolean bind = _exchange.bind(queue.getName(), boundKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, boundKey, instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(unmatchingMessage, boundKey, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), boundKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(matchingMessage, boundKey, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue after unbind", result.hasRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchanges()
+    {
+        String boundKey = "key";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       boundKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
+
+
+    public void testDestinationDeleted()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
+
+        _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+
+        assertTrue(_exchange.isBound(boundKey));
+        assertTrue(_exchange.isBound(boundKey, queue));
+        assertTrue(_exchange.isBound(queue));
+
+        queue.delete();
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
+    }
+
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
+    {
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
+    }
+
+    public void testRouteToMultipleQueues()
+    {
+        String boundKey = "key";
+        Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
+
+        boolean bind1 = _exchange.bind(queue1.getName(), boundKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        _exchange.bind(queue2.getName(), boundKey, Collections.singletonMap(JMS_SELECTOR.toString(), "prop is null"), false);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 2, result.getNumberOfRoutes());
+
+        _exchange.unbind(queue1.getName(), boundKey);
+
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        _exchange.unbind(queue2.getName(), boundKey);
+        result = _exchange.route(_messageWithNoHeaders, boundKey, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
+    {
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, "via_exchange");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2"),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       "key1",
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
+
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
+    {
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2");
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                                                                                            "key1",
+                                                                                                            _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 "key1",
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 1d7a3b0..cb4cadf 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -20,342 +20,345 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySet;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class FanoutExchangeTest extends QpidTestCase
 {
-    private FanoutExchangeImpl _exchange;
-    private QueueManagingVirtualHost _virtualHost;
-    private TaskExecutor _taskExecutor;
+    private FanoutExchange<?> _exchange;
+    private VirtualHost<?> _vhost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
-    public void setUp()
+    public void setUp() throws Exception
     {
-        Map<String,Object> attributes = new HashMap<String, Object>();
-        attributes.put(Exchange.ID, UUID.randomUUID());
+        super.setUp();
+
+        BrokerTestHelper.setUp();
+        _vhost = BrokerTestHelper.createVirtualHost(getName());
+
+        Map<String,Object> attributes = new HashMap<>();
         attributes.put(Exchange.NAME, "test");
         attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
-        Broker broker = mock(Broker.class);
-        when(broker.getCategoryClass()).thenReturn(Broker.class);
-        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
-
-        VirtualHostNode virtualHostNode = mock(VirtualHostNode.class);
-        when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
-        when(virtualHostNode.getParent()).thenReturn(broker);
-        when(virtualHostNode.getModel()).thenReturn(BrokerModel.getInstance());
-
-        _taskExecutor = new CurrentThreadTaskExecutor();
-        _taskExecutor.start();
-        _virtualHost = mock(QueueManagingVirtualHost.class);
-
-        when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
-        when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getModel()).thenReturn(BrokerModel.getInstance());
-        when(_virtualHost.getParent()).thenReturn(virtualHostNode);
-        when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
-        _exchange = new FanoutExchangeImpl(attributes, _virtualHost);
+        _exchange = (FanoutExchange<?>) _vhost.createChild(Exchange.class, attributes);
         _exchange.open();
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
     }
 
     @Override
-
     public void tearDown() throws Exception
     {
-        super.tearDown();
-        _taskExecutor.stop();
+        try
+        {
+            if (_vhost != null)
+            {
+                _vhost.close();
+            }
+        }
+        finally
+        {
+            BrokerTestHelper.tearDown();
+            super.tearDown();
+        }
     }
 
-    public void testIsBoundStringMapAMQQueueWhenQueueIsNull()
+    public void testRouteToQueue() throws Exception
     {
-        assertFalse("calling isBound(AMQShortString,FieldTable,Queue<?>) with null queue should return false",
-                _exchange.isBound((String) null, (Map) null, (Queue<?>) null));
-    }
+        String bindingKey = "mybinding";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-    public void testIsBoundStringAMQQueueWhenQueueIsNull()
-    {
-        assertFalse("calling isBound(AMQShortString,Queue<?>) with null queue should return false",
-                _exchange.isBound((String) null, (Queue<?>) null));
-    }
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null,
+                                                                                       _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue before bind", result.hasRoutes());
 
-    public void testIsBoundAMQQueueWhenQueueIsNull()
-    {
-        assertFalse("calling isBound(AMQQueue) with null queue should return false", _exchange.isBound((Queue<?>) null));
-    }
+        boolean bind = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation should be successful", bind);
 
-    public void testIsBoundStringMapAMQQueue()
-    {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound("matters", null, queue));
-    }
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertTrue("Message not routed to queue after bind", result.hasRoutes());
+
+        boolean unbind = _exchange.unbind(queue.getName(), bindingKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue after unbind", result.hasRoutes());
 
-    public void testIsBoundStringAMQQueue()
-    {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound("matters", queue));
     }
 
-    public void testIsBoundAMQQueue()
+    public void testRouteToQueueWithSelector()
     {
-        Queue<?> queue = bindQueue("matters");
-        assertTrue("Should return true for a bound queue",
-                _exchange.isBound(queue));
-    }
+        String bindingKey = "mybinding";
 
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-    public void testRouteToDestination() throws Exception
-    {
-        List<? extends BaseQueue> result;
-        Queue<?> queue = mockQueue();
+        InstanceProperties instanceProperties = mock(InstanceProperties.class);
+        ServerMessage<?> matchingMessage = createTestMessage(Collections.singletonMap("prop", true));
+        ServerMessage<?> unmatchingMessage = createTestMessage(Collections.singletonMap("prop", false));
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+        boolean bind = _exchange.bind(queue.getName(), bindingKey,
+                                      Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True"),
+                                      false);
+        assertTrue("Bind operation should be successful", bind);
 
-        _exchange.addBinding("key", queue, null);
+        RoutingResult<ServerMessage<?>> result = _exchange.route(matchingMessage, null, instanceProperties);
+        assertTrue("Message with matching selector not routed to queue", result.hasRoutes());
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+        result = _exchange.route(unmatchingMessage, null, instanceProperties);
+        assertFalse("Message without matching selector unexpectedly routed to queue", result.hasRoutes());
 
-        _exchange.deleteBinding("key", queue);
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+        boolean unbind = _exchange.unbind(queue.getName(), bindingKey);
+        assertTrue("Unbind operation should be successful", unbind);
+
+        result = _exchange.route(matchingMessage, null, instanceProperties);
+        assertFalse("Message with matching selector unexpectedly routed to queue after unbind", result.hasRoutes());
     }
 
-    public void testDestinationRemoved() throws Exception
+    public void testRouteToQueueViaTwoExchanges()
     {
-        List<? extends BaseQueue> result;
-        Queue<?> queue = mockQueue();
+        String bindingKey = "key";
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding("key", queue, null);
+        boolean exchToViaBind = _exchange.bind(via.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+        boolean viaToQueueBind = via.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        _exchange.destinationRemoved(queue);
-        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
-        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       null,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
     }
 
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _vhost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key",
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key1"),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), "key1", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       null,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
 
-    public void testRoutingWithSelectors() throws Exception
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
     {
-        Queue<?> queue = mockQueue();
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
-        List<? extends BaseQueue> result;
+        Exchange via = _vhost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding("key2", queue, Collections.<String, Object>singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),"prop = True"));
 
-        result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, "key2");
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
 
-        assertEquals("Expected matching message to be routed to queue", 1, result.size());
-        assertTrue("Expected matching message to be routed to queue", result.contains(queue));
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               "key1",
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+        boolean viaToQueueBind = via.bind(queue.getName(), "key2", Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
-        assertEquals("Expected non matching message not to be routed to queue", 0, result.size());
+        RoutingResult<ServerMessage<?>> result = _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                                                                       "key1",
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 "key1",
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
-    public void testMultipleBindings() throws Exception
+    public void testRouteToMultipleQueue()
     {
-        Queue<?> queue1 = mockQueue();
-        Queue<?> queue2 = mockQueue();
+        String bindingKey = "key";
+        Queue<?> queue1 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue1"));
+        Queue<?> queue2 = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue2"));
 
-        List<? extends BaseQueue> result;
+        boolean bind1 = _exchange.bind(queue1.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-        _exchange.addBinding("key", queue1, null);
-        _exchange.addBinding("key", queue2, null);
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-        result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+        _exchange.bind(queue2.getName(), bindingKey, Collections.singletonMap(JMS_SELECTOR.toString(), "prop is null"), false);
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
-        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 2, result.getNumberOfRoutes());
 
-        _exchange.addBinding("key1", queue2, null);
+        _exchange.unbind(queue1.getName(), bindingKey);
 
-        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
-        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+        _exchange.unbind(queue2.getName(), bindingKey);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
     }
 
-    private Queue<?> bindQueue(final String bindingKey)
+    public void testRouteToQueueBoundMultipleTimesUsingTheSameBindingKey()
     {
-        Queue<?> queue = mockQueue();
+        String bindingKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        _exchange.addBinding(bindingKey, queue, null);
-        return queue;
-    }
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-    private Queue<?> mockQueue()
-    {
-        Queue queue = mock(Queue.class);
-        String name = UUID.randomUUID().toString();
-        when(queue.getName()).thenReturn(name);
-        when(queue.getVirtualHost()).thenReturn(_virtualHost);
-        when(queue.getCategoryClass()).thenReturn(Queue.class);
-        when(queue.getModel()).thenReturn(BrokerModel.getInstance());
-        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
-        when(queue.getTaskExecutor()).thenReturn(taskExecutor);
-        when(queue.getChildExecutor()).thenReturn(taskExecutor);
-        when(queue.getParent()).thenReturn(_virtualHost);
-        when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
-        RoutingResult result = new RoutingResult(null);
-        result.addQueue(queue);
-        when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
-        return queue;
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
+
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
+
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
+
+        _exchange.unbind(queue.getName(), bindingKey);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
     }
 
-    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
-                                                    final String routingAddress,
-                                                    final InstanceProperties instanceProperties)
+    public void testRouteToQueueBoundMultipleTimesUsingDifferentBindingKeys()
     {
-        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
-        final List<BaseQueue> resultQueues = new ArrayList<>();
-        result.send(new ServerTransaction()
-        {
-            @Override
-            public long getTransactionStartTime()
-            {
-                return 0;
-            }
+        String bindingKey1 = "key1";
+        String bindingKey2 = "key2";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public long getTransactionUpdateTime()
-            {
-                return 0;
-            }
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey1, Collections.emptyMap(), false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-            @Override
-            public void addPostTransactionAction(final Action postTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            }
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey2, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
 
-            @Override
-            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
-            {
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
 
-            }
+        _exchange.unbind(queue.getName(), bindingKey1);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
-            {
+        _exchange.unbind(queue.getName(), bindingKey2);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
 
-            }
+    public void testRouteToQueueBoundMultipleTimesUsingFilteredAndUnfilteredBindings()
+    {
+        String bindingKey1 = "key1";
+        String bindingKey2 = "key2";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public void enqueue(final TransactionLogResource queue,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.add((BaseQueue) queue);
-            }
+        Map<String, Object> argumentsWithFilter = Collections.singletonMap(JMS_SELECTOR.toString(), "prop = True");
+        boolean bind1 = _exchange.bind(queue.getName(), bindingKey1,
+                                       argumentsWithFilter, false);
+        assertTrue("Bind operation to queue1 should be successful", bind1);
 
-            @Override
-            public void enqueue(final Collection<? extends BaseQueue> queues,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.addAll(queues);
-            }
+        final ServerMessage<?> messageMatchingSelector =
+                createTestMessage(Collections.singletonMap("prop", true));
+        RoutingResult<ServerMessage<?>> result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void commit()
-            {
+        boolean bind2 = _exchange.bind(queue.getName(), bindingKey2, Collections.emptyMap(), true);
+        assertTrue("Bind operation to queue1 should be successful", bind2);
 
-            }
+        RoutingResult<ServerMessage<?>> result2 = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result2.getNumberOfRoutes());
 
-            @Override
-            public void commit(final Runnable immediatePostTransactionAction)
-            {
+        _exchange.unbind(queue.getName(), bindingKey2);
+        result = _exchange.route(_messageWithNoHeaders, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
 
-            }
+        result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 1, result.getNumberOfRoutes());
 
-            @Override
-            public void rollback()
-            {
+        _exchange.unbind(queue.getName(), bindingKey1);
+        result = _exchange.route(messageMatchingSelector, null, _instanceProperties);
+        assertEquals("Message routed to unexpected number of queues", 0, result.getNumberOfRoutes());
+    }
 
-            }
+    public void testIsBound()
+    {
+        String boundKey = "key";
+        Queue<?> queue = _vhost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-            @Override
-            public boolean isTransactional()
-            {
-                return false;
-            }
-        }, null);
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
 
-        return resultQueues;
+        _exchange.bind(queue.getName(), boundKey, Collections.emptyMap(), false);
+
+        assertTrue(_exchange.isBound(boundKey));
+        assertTrue(_exchange.isBound(boundKey, queue));
+        assertTrue(_exchange.isBound(queue));
+
+        queue.delete();
+
+        assertFalse(_exchange.isBound(boundKey));
+        assertFalse(_exchange.isBound(boundKey, queue));
+        assertFalse(_exchange.isBound(queue));
     }
 
-    private ServerMessage mockMessage(boolean propValue)
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
-        final AMQMessageHeader header = mock(AMQMessageHeader.class);
-        when(header.containsHeader("prop")).thenReturn(true);
-        when(header.getHeader("prop")).thenReturn(propValue);
-        when(header.getHeaderNames()).thenReturn(Collections.singleton("prop"));
-        when(header.containsHeaders(anySet())).then(new Answer<Object>()
-        {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable
-            {
-                final Set names = (Set) invocation.getArguments()[0];
-                return names.size() == 1 && names.contains("select");
-
-            }
-        });
-        final ServerMessage serverMessage = mock(ServerMessage.class);
-        when(serverMessage.getMessageHeader()).thenReturn(header);
-        when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        return serverMessage;
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1abc9359/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index ca7f46e..ced6924 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -20,193 +20,84 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anySet;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class HeadersExchangeTest extends QpidTestCase
 {
-    private HeadersExchangeImpl _exchange;
-    private QueueManagingVirtualHost _virtualHost;
-    private TaskExecutor _taskExecutor;
-    private ConfiguredObjectFactoryImpl _factory;
+    private HeadersExchange<?> _exchange;
+    private QueueManagingVirtualHost<?> _virtualHost;
+    private InstanceProperties _instanceProperties;
+    private ServerMessage<?> _messageWithNoHeaders;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
 
-        _taskExecutor = new CurrentThreadTaskExecutor();
-        _taskExecutor.start();
-        _virtualHost = mock(QueueManagingVirtualHost.class);
-
-        Broker broker = mock(Broker.class);
-        when(broker.getCategoryClass()).thenReturn(Broker.class);
-        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
-
-        VirtualHostNode virtualHostNode = mock(VirtualHostNode.class);
-        when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
-        when(virtualHostNode.getParent()).thenReturn(broker);
-        when(virtualHostNode.getModel()).thenReturn(BrokerModel.getInstance());
-
-        when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
-        when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
-        when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor);
-        when(_virtualHost.getState()).thenReturn(State.ACTIVE);
-
-        _factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
-        when(_virtualHost.getObjectFactory()).thenReturn(_factory);
-        when(_virtualHost.getModel()).thenReturn(_factory.getModel());
-        when(_virtualHost.getParent()).thenReturn(virtualHostNode);
-        Map<String,Object> attributes = new HashMap<String, Object>();
-        attributes.put(Exchange.ID, UUID.randomUUID());
+        _virtualHost = BrokerTestHelper.createVirtualHost("test");
+
+        Map<String,Object> attributes = new HashMap<>();
         attributes.put(Exchange.NAME, "test");
         attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
 
-        _exchange = new HeadersExchangeImpl(attributes, _virtualHost);
+        _exchange = (HeadersExchange) _virtualHost.createChild(Exchange.class, attributes);
+
+        _instanceProperties = mock(InstanceProperties.class);
+        _messageWithNoHeaders = createTestMessage(Collections.emptyMap());
 
     }
 
     @Override
     public void tearDown() throws Exception
     {
+        if (_virtualHost  != null)
+        {
+            _virtualHost.close();
+        }
         super.tearDown();
-        _taskExecutor.stop();
+
     }
 
-    protected void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception
+    private void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception
     {
-        List<? extends BaseQueue> results = routeToQueues(msg, "", InstanceProperties.EMPTY);
-        List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
+        RoutingResult<?> result = _exchange.route(msg, "", InstanceProperties.EMPTY);
+        Collection<BaseQueue> results = result.getRoutes();
+        List<BaseQueue> unexpected = new ArrayList<>(results);
         unexpected.removeAll(Arrays.asList(expected));
         assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
-        List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected));
+        List<BaseQueue> missing = new ArrayList<>(Arrays.asList(expected));
         missing.removeAll(results);
         assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty());
-        assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size());
-    }
-
-    private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
-                                                    final String routingAddress,
-                                                    final InstanceProperties instanceProperties)
-    {
-        RoutingResult result = _exchange.route(message, routingAddress, instanceProperties);
-        final List<BaseQueue> resultQueues = new ArrayList<>();
-        result.send(new ServerTransaction()
-        {
-            @Override
-            public long getTransactionStartTime()
-            {
-                return 0;
-            }
-
-            @Override
-            public long getTransactionUpdateTime()
-            {
-                return 0;
-            }
-
-            @Override
-            public void addPostTransactionAction(final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void enqueue(final TransactionLogResource queue,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.add((BaseQueue) queue);
-            }
-
-            @Override
-            public void enqueue(final Collection<? extends BaseQueue> queues,
-                                final EnqueueableMessage message,
-                                final EnqueueAction postTransactionAction)
-            {
-                resultQueues.addAll(queues);
-            }
-
-            @Override
-            public void commit()
-            {
-
-            }
-
-            @Override
-            public void commit(final Runnable immediatePostTransactionAction)
-            {
-
-            }
-
-            @Override
-            public void rollback()
-            {
-
-            }
-
-            @Override
-            public boolean isTransactional()
-            {
-                return false;
-            }
-        }, null);
-
-        return resultQueues;
+        assertTrue("Duplicates " + results, results.size()==(new HashSet<>(results)).size());
     }
 
 
@@ -218,7 +109,7 @@ public class HeadersExchangeTest extends QpidTestCase
 
     private Map<String, Object> getArgsMapFromStrings(String... arguments)
     {
-        Map<String, Object> map = new HashMap<String,Object>();
+        Map<String, Object> map = new HashMap<>();
 
         for(String arg : arguments)
         {
@@ -238,33 +129,8 @@ public class HeadersExchangeTest extends QpidTestCase
     private Queue<?> createAndBind(final String name, Map<String, Object> arguments)
             throws Exception
     {
-        Queue<?> q = create(name);
-        bind(name, arguments, q);
-        return q;
-    }
-
-    private void bind(String bindingKey, Map<String, Object> arguments, Queue<?> q)
-    {
-        _exchange.addBinding(bindingKey,q,arguments);
-    }
-
-    private Queue<?> create(String name)
-    {
-        Queue q = mock(Queue.class);
-        when(q.getName()).thenReturn(name);
-        when(q.toString()).thenReturn(name);
-        when(q.getVirtualHost()).thenReturn(_virtualHost);
-        when(q.getParent()).thenReturn(_virtualHost);
-        when(q.getCategoryClass()).thenReturn(Queue.class);
-        when(q.getObjectFactory()).thenReturn(_factory);
-        when(q.getModel()).thenReturn(_factory.getModel());
-        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
-        when(q.getTaskExecutor()).thenReturn(taskExecutor);
-        when(q.getChildExecutor()).thenReturn(taskExecutor);
-        when(_virtualHost.getAttainedQueue(name)).thenReturn(q);
-        final RoutingResult routingResult = new RoutingResult(null);
-        routingResult.addQueue(q);
-        when(q.route(any(ServerMessage.class), anyString(), any(InstanceProperties.class))).thenReturn(routingResult);
+        Queue<?> q = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, name));
+        _exchange.addBinding(name, q, arguments);
         return q;
     }
 
@@ -280,13 +146,13 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q7 = createAndBind("Q7", "F0000", "F0001=Bear");
         Queue<?> q8 = createAndBind("Q8", "F0000=Aardvark", "F0001");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
-                q1, q2, q3, q4, q5, q6, q7, q8);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")),
+                     q1, q2, q3, q4, q5, q6, q7, q8);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0002")));
 
     }
 
@@ -298,12 +164,12 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any");
         Queue<?> q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0002")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1, q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0002")));
     }
 
     public void testOnUnbind() throws Exception
@@ -312,78 +178,153 @@ public class HeadersExchangeTest extends QpidTestCase
         Queue<?> q2 = createAndBind("Q2", "F0000=Aardvark");
         Queue<?> q3 = createAndBind("Q3", "F0001");
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")), q1);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0001")), q3);
 
         _exchange.deleteBinding("Q1",q1);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000")));
-        routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000")));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F0000=Aardvark")), q2);
     }
 
 
     public void testWithSelectors() throws Exception
     {
-        Queue<?> q1 = create("Q1");
-        Queue<?> q2 = create("Q2");
-        bind("q1",getArgsMapFromStrings("F"), q1);
-        bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1);
-        bind("q2",getArgsMapFromStrings("F=1"), q2);
+        Queue<?> q1 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q1"));
+        Queue<?> q2 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q2"));
+        _exchange.addBinding("q1", q1, getArgsMapFromStrings("F"));
+        _exchange.addBinding("q1select",
+                             q1,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='1'"));
+        _exchange.addBinding("q2", q2, getArgsMapFromStrings("F=1"));
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F")),q1);
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=1")), q1, q2);
+
+        Queue<?> q3 = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, "Q3"));
+        _exchange.addBinding("q3select",
+                             q3,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='1'"));
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=1")), q1, q2, q3);
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=2")), q1);
+        _exchange.addBinding("q3select2",
+                             q3,
+                             getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString() + "=F='2'"));
+
+        routeAndTest(createTestMessage(getArgsMapFromStrings("F=2")), q1, q3);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1);
+    }
+
+    public void testRouteToQueueViaTwoExchanges()
+    {
+        String bindingKey = "key";
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
+        Exchange via = _virtualHost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
 
-        Queue<?> q3 = create("Q3");
-        bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3);
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1);
-        bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3);
+        boolean exchToViaBind = _exchange.bind(via.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
 
-        routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3);
+        boolean viaToQueueBind = via.bind(queue.getName(), bindingKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
 
+        RoutingResult<ServerMessage<?>> result = _exchange.route(_messageWithNoHeaders,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
     }
 
-    private ServerMessage mockMessage(final Map<String, Object> headerValues)
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKey()
     {
-        final AMQMessageHeader header = mock(AMQMessageHeader.class);
-        when(header.containsHeader(anyString())).then(new Answer<Boolean>()
-        {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws Throwable
-            {
-                return headerValues.containsKey((String) invocation.getArguments()[0]);
-            }
-        });
-        when(header.getHeader(anyString())).then(new Answer<Object>()
-        {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable
-            {
-                return headerValues.get((String) invocation.getArguments()[0]);
-            }
-        });
-        when(header.getHeaderNames()).thenReturn(headerValues.keySet());
-        when(header.containsHeaders(anySet())).then(new Answer<Boolean>()
-        {
-            @Override
-            public Boolean answer(InvocationOnMock invocation) throws Throwable
-            {
-                final Set names = (Set) invocation.getArguments()[0];
-                return headerValues.keySet().containsAll(names);
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, getTestName());
+        attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Exchange via = _virtualHost.createChild(Exchange.class, attributes);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+        String bindingKey = "key";
+        String replacementKey = "key1";
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               Collections.singletonMap(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY,
+                                                                        replacementKey),
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        Map<String, Object> arguments = getArgsMapFromStrings("prop=true", "prop2=true", "X-match=any");
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, arguments, false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        ServerMessage<?> testMessage = createTestMessage(Collections.singletonMap("prop", true));
+        RoutingResult<ServerMessage<?>> result = _exchange.route(testMessage,
+                                                                                       bindingKey,
+                                                                                       _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+    }
 
-            }
-        });
-        final ServerMessage serverMessage = mock(ServerMessage.class);
-        when(serverMessage.getMessageHeader()).thenReturn(header);
-        when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
-        return serverMessage;
+    public void testRouteToQueueViaTwoExchangesWithReplacementRoutingKeyAndFiltering()
+    {
+        String bindingKey = "key1";
+        String replacementKey = "key2";
+
+        Map<String, Object> viaExchangeArguments = new HashMap<>();
+        viaExchangeArguments.put(Exchange.NAME, getTestName() + "_via_exch");
+        viaExchangeArguments.put(Exchange.TYPE, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
+
+        Exchange via = _virtualHost.createChild(Exchange.class, viaExchangeArguments);
+        Queue<?> queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_queue"));
+
+
+        Map<String, Object> exchToViaBindArguments = new HashMap<>();
+        exchToViaBindArguments.put(Binding.BINDING_ARGUMENT_REPLACEMENT_ROUTING_KEY, replacementKey);
+        exchToViaBindArguments.put(JMS_SELECTOR.toString(), "prop = True");
+
+        boolean exchToViaBind = _exchange.bind(via.getName(),
+                                               bindingKey,
+                                               exchToViaBindArguments,
+                                               false);
+        assertTrue("Exchange to exchange bind operation should be successful", exchToViaBind);
+
+        boolean viaToQueueBind = via.bind(queue.getName(), replacementKey, Collections.emptyMap(), false);
+        assertTrue("Exchange to queue bind operation should be successful", viaToQueueBind);
+
+        RoutingResult<ServerMessage<?>> result =
+                _exchange.route(createTestMessage(Collections.singletonMap("prop", true)),
+                                bindingKey,
+                                _instanceProperties);
+        assertTrue("Message unexpectedly not routed to queue", result.hasRoutes());
+
+        result = _exchange.route(createTestMessage(Collections.singletonMap("prop", false)),
+                                 bindingKey,
+                                 _instanceProperties);
+        assertFalse("Message unexpectedly routed to queue", result.hasRoutes());
     }
 
-    public static junit.framework.Test suite()
+    private ServerMessage<?> createTestMessage(Map<String, Object> headerValues)
     {
-        return new junit.framework.TestSuite(HeadersExchangeTest.class);
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        headerValues.forEach((key, value) -> when(header.getHeader(key)).thenReturn(value));
+        headerValues.forEach((key, value) -> when(header.containsHeader(key)).thenReturn(true));
+        when(header.getHeaderNames()).thenReturn(headerValues.keySet());
+        when(header.containsHeaders(any())).then(invocation ->
+                                                 {
+                                                     final Set<String> names =
+                                                             (Set<String>) invocation.getArguments()[0];
+                                                     return headerValues.keySet().containsAll(names);
+                                                 });
+
+        @SuppressWarnings("unchecked")
+        ServerMessage<?> message = mock(ServerMessage.class);
+        when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true);
+        when(message.getMessageHeader()).thenReturn(header);
+        return message;
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org