You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 00:47:23 UTC
svn commit: r1295341 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
Author: rgodfrey
Date: Wed Feb 29 23:47:22 2012
New Revision: 1295341
URL: http://svn.apache.org/viewvc?rev=1295341&view=rev
Log:
QPID-3605 : renamed method, corrected brace style for ifs, added tests (per Robbies review comments)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1295341&r1=1295340&r2=1295341&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Feb 29 23:47:22 2012
@@ -127,11 +127,13 @@ public class TopicExchange extends Abstr
{
if(argumentsContainFilter(oldArgs))
{
- result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue));
+ result.replaceQueueFilter(queue,
+ createMessageFilter(oldArgs, queue),
+ createMessageFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue,createSelectorFilter(args,queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
@@ -140,7 +142,7 @@ public class TopicExchange extends Abstr
if(argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue));
+ result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
}
else
{
@@ -161,7 +163,7 @@ public class TopicExchange extends Abstr
result = new TopicExchangeResult();
if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -174,7 +176,7 @@ public class TopicExchange extends Abstr
{
if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+ result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
@@ -188,7 +190,7 @@ public class TopicExchange extends Abstr
}
- private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+ private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
{
if(argumentsContainNoLocal(args))
{
@@ -381,7 +383,7 @@ public class TopicExchange extends Abstr
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue()));
+ result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -478,8 +480,15 @@ public class TopicExchange extends Abstr
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
NoLocalFilter that = (NoLocalFilter) o;
@@ -512,15 +521,25 @@ public class TopicExchange extends Abstr
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
CompoundFilter that = (CompoundFilter) o;
if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ {
return false;
+ }
if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ {
return false;
+ }
return true;
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java?rev=1295341&r1=1295340&r2=1295341&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java Wed Feb 29 23:47:22 2012
@@ -42,7 +42,7 @@ public class NoLocalAfterRecoveryTest ex
protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
protected static final int SEND_COUNT = 10;
- public void test() throws Exception
+ public void testNoLocalNotQueued() throws Exception
{
if(!isBrokerStorePersistent())
{
@@ -73,6 +73,8 @@ public class NoLocalAfterRecoveryTest ex
assertEquals("Normal Subscriber Received no messages",
SEND_COUNT, received.size());
session.commit();
+
+ normalSubscriber.close();
connection.close();
//We didn't receive the messages on the durable queue for the no-local subscriber
@@ -94,6 +96,67 @@ public class NoLocalAfterRecoveryTest ex
session2.commit();
assertEquals("No Local Subscriber Received messages", 0, received.size());
+ noLocalSubscriber2.close();
+
+
+ }
+
+
+ public void testNonNoLocalQueued() throws Exception
+ {
+ if(!isBrokerStorePersistent())
+ {
+ fail("This test requires a broker with a persistent store");
+ }
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber =
+ session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
+
+
+ sendMessage(session, topic, SEND_COUNT);
+
+ // Check messages can be received as expected.
+ connection.start();
+
+ List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+ assertEquals("No Local Subscriber Received messages", 0, received.size());
+
+
+
+ session.commit();
+
+ Connection connection3 = getConnection();
+ Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
+ sendMessage(session3, topic, SEND_COUNT);
+
+
+ connection.close();
+
+ //We didn't receive the messages on the durable queue for the no-local subscriber
+ //so they are still on the broker. Restart the broker, prompting their recovery.
+ restartBroker();
+
+ Connection connection2 = getConnection();
+ connection2.start();
+
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+ TopicSubscriber noLocalSubscriber2 =
+ session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
+
+ // The NO-local subscriber should receive messages sent from connection3
+ received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+ session2.commit();
+ assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
+
+ noLocalSubscriber2.close();
+
+
}
protected List<Message> receiveMessage(MessageConsumer messageConsumer,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org