You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/27 16:38:23 UTC

svn commit: r1780580 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/exchange/AbstractExchange.java test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java

Author: kwall
Date: Fri Jan 27 16:38:23 2017
New Revision: 1780580

URL: http://svn.apache.org/viewvc?rev=1780580&view=rev
Log:
QPID-6028:  [Java Broker] Ensure queue removal triggers exchange unbind

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1780580&r1=1780579&r2=1780580&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Jan 27 16:38:23 2017
@@ -248,7 +248,7 @@ public abstract class AbstractExchange<T
                 final Map<String, Object> bindArguments =
                         UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(), destination);
                 getEventLogger().message(_logSubject, BindingMessages.DELETED(String.valueOf(bindArguments)));
-
+                onUnbind(new BindingIdentifier(b.getBindingKey(), destination));
                 _bindings.remove(b);
             }
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1780580&r1=1780579&r2=1780580&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Fri Jan 27 16:38:23 2017
@@ -69,6 +69,7 @@ public class FanoutExchangeTest extends
     private QueueManagingVirtualHost _virtualHost;
     private TaskExecutor _taskExecutor;
 
+    @Override
     public void setUp()
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
@@ -100,6 +101,8 @@ public class FanoutExchangeTest extends
         _exchange.open();
     }
 
+    @Override
+
     public void tearDown() throws Exception
     {
         super.tearDown();
@@ -125,98 +128,131 @@ public class FanoutExchangeTest extends
 
     public void testIsBoundStringMapAMQQueue()
     {
-        Queue<?> queue = bindQueue();
+        Queue<?> queue = bindQueue("matters");
         assertTrue("Should return true for a bound queue",
                 _exchange.isBound("matters", null, queue));
     }
 
     public void testIsBoundStringAMQQueue()
     {
-        Queue<?> queue = bindQueue();
+        Queue<?> queue = bindQueue("matters");
         assertTrue("Should return true for a bound queue",
                 _exchange.isBound("matters", queue));
     }
 
     public void testIsBoundAMQQueue()
     {
-        Queue<?> queue = bindQueue();
+        Queue<?> queue = bindQueue("matters");
         assertTrue("Should return true for a bound queue",
                 _exchange.isBound(queue));
     }
 
-    private Queue<?> bindQueue()
+
+    public void testRouteToDestination() throws Exception
     {
+        List<? extends BaseQueue> result;
         Queue<?> queue = mockQueue();
 
-        _exchange.addBinding("matters", queue, null);
-        return queue;
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+        _exchange.addBinding("key", queue, null);
+
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+        _exchange.deleteBinding("key", queue);
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
     }
 
-    private Queue<?> mockQueue()
+    public void testDestinationRemoved() throws Exception
     {
-        Queue queue = mock(Queue.class);
-        String name = UUID.randomUUID().toString();
-        when(queue.getName()).thenReturn(name);
-        when(queue.getVirtualHost()).thenReturn(_virtualHost);
-        when(queue.getCategoryClass()).thenReturn(Queue.class);
-        when(queue.getModel()).thenReturn(BrokerModel.getInstance());
-        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
-        when(queue.getTaskExecutor()).thenReturn(taskExecutor);
-        when(queue.getChildExecutor()).thenReturn(taskExecutor);
-        when(queue.getParent()).thenReturn(_virtualHost);
-        when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
-        RoutingResult result = new RoutingResult(null);
-        result.addQueue(queue);
-        when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
-        return queue;
+        List<? extends BaseQueue> result;
+        Queue<?> queue = mockQueue();
+
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+        _exchange.addBinding("key", queue, null);
+
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+        _exchange.destinationRemoved(queue);
+        result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+        assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
     }
 
+
     public void testRoutingWithSelectors() throws Exception
     {
-        Queue<?> queue1 = mockQueue();
-        Queue<?> queue2 = mockQueue();
+        Queue<?> queue = mockQueue();
 
+        List<? extends BaseQueue> result;
 
-        _exchange.addBinding("key",queue1, null);
-        _exchange.addBinding("key",queue2, null);
+        _exchange.addBinding("key2", queue, Collections.<String, Object>singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),"prop = True"));
 
-        List<? extends BaseQueue> result;
         result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
-        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+        assertEquals("Expected matching message to be routed to queue", 1, result.size());
+        assertTrue("Expected matching message to be routed to queue", result.contains(queue));
 
-        _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
 
-        result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+        assertEquals("Expected non matching message not to be routed to queue", 0, result.size());
+    }
 
-        assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
-        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+    public void testMultipleBindings() throws Exception
+    {
+        Queue<?> queue1 = mockQueue();
+        Queue<?> queue2 = mockQueue();
+
+        List<? extends BaseQueue> result;
 
-        _exchange.deleteBinding("key",queue2);
+        _exchange.addBinding("key", queue1, null);
+        _exchange.addBinding("key", queue2, null);
 
         result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
 
         assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
-        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
 
-        result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
-
-        assertEquals("Expected message to be routed to queue1 only", 1, result.size());
-        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
-        assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
-
-        _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+        _exchange.addBinding("key1", queue2, null);
 
         result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+
         assertEquals("Expected message to be routed to both queues", 2, result.size());
-        assertTrue("Expected queue1 to be routed to", result.contains(queue1));
-        assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+        assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+        assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+    }
 
+    private Queue<?> bindQueue(final String bindingKey)
+    {
+        Queue<?> queue = mockQueue();
+
+        _exchange.addBinding(bindingKey, queue, null);
+        return queue;
+    }
 
+    private Queue<?> mockQueue()
+    {
+        Queue queue = mock(Queue.class);
+        String name = UUID.randomUUID().toString();
+        when(queue.getName()).thenReturn(name);
+        when(queue.getVirtualHost()).thenReturn(_virtualHost);
+        when(queue.getCategoryClass()).thenReturn(Queue.class);
+        when(queue.getModel()).thenReturn(BrokerModel.getInstance());
+        TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+        when(queue.getTaskExecutor()).thenReturn(taskExecutor);
+        when(queue.getChildExecutor()).thenReturn(taskExecutor);
+        when(queue.getParent()).thenReturn(_virtualHost);
+        when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
+        RoutingResult result = new RoutingResult(null);
+        result.addQueue(queue);
+        when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
+        return queue;
     }
 
     private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
@@ -301,12 +337,12 @@ public class FanoutExchangeTest extends
         return resultQueues;
     }
 
-    private ServerMessage mockMessage(boolean val)
+    private ServerMessage mockMessage(boolean propValue)
     {
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
-        when(header.containsHeader("select")).thenReturn(true);
-        when(header.getHeader("select")).thenReturn(val);
-        when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+        when(header.containsHeader("prop")).thenReturn(true);
+        when(header.getHeader("prop")).thenReturn(propValue);
+        when(header.getHeaderNames()).thenReturn(Collections.singleton("prop"));
         when(header.containsHeaders(anySet())).then(new Answer<Object>()
         {
             @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org