You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/01/27 16:38:23 UTC
svn commit: r1780580 - in /qpid/java/trunk/broker-core/src:
main/java/org/apache/qpid/server/exchange/AbstractExchange.java
test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
Author: kwall
Date: Fri Jan 27 16:38:23 2017
New Revision: 1780580
URL: http://svn.apache.org/viewvc?rev=1780580&view=rev
Log:
QPID-6028: [Java Broker] Ensure queue removal triggers exchange unbind
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1780580&r1=1780579&r2=1780580&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Jan 27 16:38:23 2017
@@ -248,7 +248,7 @@ public abstract class AbstractExchange<T
final Map<String, Object> bindArguments =
UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(), destination);
getEventLogger().message(_logSubject, BindingMessages.DELETED(String.valueOf(bindArguments)));
-
+ onUnbind(new BindingIdentifier(b.getBindingKey(), destination));
_bindings.remove(b);
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java?rev=1780580&r1=1780579&r2=1780580&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java Fri Jan 27 16:38:23 2017
@@ -69,6 +69,7 @@ public class FanoutExchangeTest extends
private QueueManagingVirtualHost _virtualHost;
private TaskExecutor _taskExecutor;
+ @Override
public void setUp()
{
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -100,6 +101,8 @@ public class FanoutExchangeTest extends
_exchange.open();
}
+ @Override
+
public void tearDown() throws Exception
{
super.tearDown();
@@ -125,98 +128,131 @@ public class FanoutExchangeTest extends
public void testIsBoundStringMapAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound("matters", null, queue));
}
public void testIsBoundStringAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound("matters", queue));
}
public void testIsBoundAMQQueue()
{
- Queue<?> queue = bindQueue();
+ Queue<?> queue = bindQueue("matters");
assertTrue("Should return true for a bound queue",
_exchange.isBound(queue));
}
- private Queue<?> bindQueue()
+
+ public void testRouteToDestination() throws Exception
{
+ List<? extends BaseQueue> result;
Queue<?> queue = mockQueue();
- _exchange.addBinding("matters", queue, null);
- return queue;
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+ _exchange.addBinding("key", queue, null);
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+ _exchange.deleteBinding("key", queue);
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
}
- private Queue<?> mockQueue()
+ public void testDestinationRemoved() throws Exception
{
- Queue queue = mock(Queue.class);
- String name = UUID.randomUUID().toString();
- when(queue.getName()).thenReturn(name);
- when(queue.getVirtualHost()).thenReturn(_virtualHost);
- when(queue.getCategoryClass()).thenReturn(Queue.class);
- when(queue.getModel()).thenReturn(BrokerModel.getInstance());
- TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
- when(queue.getTaskExecutor()).thenReturn(taskExecutor);
- when(queue.getChildExecutor()).thenReturn(taskExecutor);
- when(queue.getParent()).thenReturn(_virtualHost);
- when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
- RoutingResult result = new RoutingResult(null);
- result.addQueue(queue);
- when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
- return queue;
+ List<? extends BaseQueue> result;
+ Queue<?> queue = mockQueue();
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange without bindings routed message to unexpected number of queues", 0, result.size());
+
+ _exchange.addBinding("key", queue, null);
+
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with 1 binding routed message to unexpected number of queues", 1, result.size());
+
+ _exchange.destinationRemoved(queue);
+ result = routeToQueues(mockMessage(true), null, InstanceProperties.EMPTY);
+ assertEquals("Fanout exchange with no bindings routed message to unexpected number of queues", 0, result.size());
}
+
public void testRoutingWithSelectors() throws Exception
{
- Queue<?> queue1 = mockQueue();
- Queue<?> queue2 = mockQueue();
+ Queue<?> queue = mockQueue();
+ List<? extends BaseQueue> result;
- _exchange.addBinding("key",queue1, null);
- _exchange.addBinding("key",queue2, null);
+ _exchange.addBinding("key2", queue, Collections.<String, Object>singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),"prop = True"));
- List<? extends BaseQueue> result;
result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+ assertEquals("Expected matching message to be routed to queue", 1, result.size());
+ assertTrue("Expected matching message to be routed to queue", result.contains(queue));
- _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
+ result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
- result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
+ assertEquals("Expected non matching message not to be routed to queue", 0, result.size());
+ }
- assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+ public void testMultipleBindings() throws Exception
+ {
+ Queue<?> queue1 = mockQueue();
+ Queue<?> queue2 = mockQueue();
+
+ List<? extends BaseQueue> result;
- _exchange.deleteBinding("key",queue2);
+ _exchange.addBinding("key", queue1, null);
+ _exchange.addBinding("key", queue2, null);
result = routeToQueues(mockMessage(true), "", InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+ assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+ assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
- result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
-
- assertEquals("Expected message to be routed to queue1 only", 1, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertFalse("Expected queue2 not to be routed to", result.contains(queue2));
-
- _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
+ _exchange.addBinding("key1", queue2, null);
result = routeToQueues(mockMessage(false), "", InstanceProperties.EMPTY);
+
assertEquals("Expected message to be routed to both queues", 2, result.size());
- assertTrue("Expected queue1 to be routed to", result.contains(queue1));
- assertTrue("Expected queue2 to be routed to", result.contains(queue2));
+ assertTrue("Expected queue1 to be in routing result", result.contains(queue1));
+ assertTrue("Expected queue2 to be in routing result", result.contains(queue2));
+ }
+ private Queue<?> bindQueue(final String bindingKey)
+ {
+ Queue<?> queue = mockQueue();
+
+ _exchange.addBinding(bindingKey, queue, null);
+ return queue;
+ }
+ private Queue<?> mockQueue()
+ {
+ Queue queue = mock(Queue.class);
+ String name = UUID.randomUUID().toString();
+ when(queue.getName()).thenReturn(name);
+ when(queue.getVirtualHost()).thenReturn(_virtualHost);
+ when(queue.getCategoryClass()).thenReturn(Queue.class);
+ when(queue.getModel()).thenReturn(BrokerModel.getInstance());
+ TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+ when(queue.getTaskExecutor()).thenReturn(taskExecutor);
+ when(queue.getChildExecutor()).thenReturn(taskExecutor);
+ when(queue.getParent()).thenReturn(_virtualHost);
+ when(_virtualHost.getAttainedQueue(eq(name))).thenReturn(queue);
+ RoutingResult result = new RoutingResult(null);
+ result.addQueue(queue);
+ when(queue.route(any(ServerMessage.class),anyString(),any(InstanceProperties.class))).thenReturn(result);
+ return queue;
}
private List<? extends BaseQueue> routeToQueues(final ServerMessage message,
@@ -301,12 +337,12 @@ public class FanoutExchangeTest extends
return resultQueues;
}
- private ServerMessage mockMessage(boolean val)
+ private ServerMessage mockMessage(boolean propValue)
{
final AMQMessageHeader header = mock(AMQMessageHeader.class);
- when(header.containsHeader("select")).thenReturn(true);
- when(header.getHeader("select")).thenReturn(val);
- when(header.getHeaderNames()).thenReturn(Collections.singleton("select"));
+ when(header.containsHeader("prop")).thenReturn(true);
+ when(header.getHeader("prop")).thenReturn(propValue);
+ when(header.getHeaderNames()).thenReturn(Collections.singleton("prop"));
when(header.containsHeaders(anySet())).then(new Answer<Object>()
{
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org