You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 00:47:23 UTC

svn commit: r1295341 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java

Author: rgodfrey
Date: Wed Feb 29 23:47:22 2012
New Revision: 1295341

URL: http://svn.apache.org/viewvc?rev=1295341&view=rev
Log:
QPID-3605 : renamed method, corrected brace style for ifs, added tests (per Robbies review comments)

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1295341&r1=1295340&r2=1295341&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Feb 29 23:47:22 2012
@@ -127,11 +127,13 @@ public class TopicExchange extends Abstr
             {
                 if(argumentsContainFilter(oldArgs))
                 {
-                    result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue));
+                    result.replaceQueueFilter(queue,
+                                              createMessageFilter(oldArgs, queue),
+                                              createMessageFilter(args, queue));
                 }
                 else
                 {
-                    result.addFilteredQueue(queue,createSelectorFilter(args,queue));
+                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
                     result.removeUnfilteredQueue(queue);
                 }
             }
@@ -140,7 +142,7 @@ public class TopicExchange extends Abstr
                 if(argumentsContainFilter(oldArgs))
                 {
                     result.addUnfilteredQueue(queue);
-                    result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue));
+                    result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
                 }
                 else
                 {
@@ -161,7 +163,7 @@ public class TopicExchange extends Abstr
                 result = new TopicExchangeResult();
                 if(argumentsContainFilter(args))
                 {
-                    result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
                 }
                 else
                 {
@@ -174,7 +176,7 @@ public class TopicExchange extends Abstr
             {
                 if(argumentsContainFilter(args))
                 {
-                    result.addFilteredQueue(queue, createSelectorFilter(args, queue));
+                    result.addFilteredQueue(queue, createMessageFilter(args, queue));
                 }
                 else
                 {
@@ -188,7 +190,7 @@ public class TopicExchange extends Abstr
 
     }
 
-    private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
+    private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
     {
         if(argumentsContainNoLocal(args))
         {
@@ -381,7 +383,7 @@ public class TopicExchange extends Abstr
             {
                 try
                 {
-                    result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue()));
+                    result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
                 }
                 catch (AMQInvalidArgumentException e)
                 {
@@ -478,8 +480,15 @@ public class TopicExchange extends Abstr
         @Override
         public boolean equals(Object o)
         {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o)
+            {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
 
             NoLocalFilter that = (NoLocalFilter) o;
 
@@ -512,15 +521,25 @@ public class TopicExchange extends Abstr
         @Override
         public boolean equals(Object o)
         {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
 
             CompoundFilter that = (CompoundFilter) o;
 
             if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+            {
                 return false;
+            }
             if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+            {
                 return false;
+            }
 
             return true;
         }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java?rev=1295341&r1=1295340&r2=1295341&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java Wed Feb 29 23:47:22 2012
@@ -42,7 +42,7 @@ public class NoLocalAfterRecoveryTest ex
     protected final String MY_TOPIC_SUBSCRIPTION_NAME = this.getName();
     protected static final int SEND_COUNT = 10;
 
-    public void test() throws Exception
+    public void testNoLocalNotQueued() throws Exception
     {
         if(!isBrokerStorePersistent())
         {
@@ -73,6 +73,8 @@ public class NoLocalAfterRecoveryTest ex
         assertEquals("Normal Subscriber Received no messages",
                      SEND_COUNT, received.size());
         session.commit();
+
+        normalSubscriber.close();
         connection.close();
 
         //We didn't receive the messages on the durable queue for the no-local subscriber
@@ -94,6 +96,67 @@ public class NoLocalAfterRecoveryTest ex
         session2.commit();
         assertEquals("No Local Subscriber Received messages", 0, received.size());
 
+        noLocalSubscriber2.close();
+
+
+    }
+
+
+    public void testNonNoLocalQueued() throws Exception
+    {
+        if(!isBrokerStorePersistent())
+        {
+            fail("This test requires a broker with a persistent store");
+        }
+
+        Connection connection = getConnection();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic = session.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+        TopicSubscriber noLocalSubscriber =
+                session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
+
+
+        sendMessage(session, topic, SEND_COUNT);
+
+        // Check messages can be received as expected.
+        connection.start();
+
+        List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
+        assertEquals("No Local Subscriber Received messages", 0, received.size());
+
+
+
+        session.commit();
+
+        Connection connection3 = getConnection();
+        Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
+        sendMessage(session3, topic, SEND_COUNT);
+
+
+        connection.close();
+
+        //We didn't receive the messages on the durable queue for the no-local subscriber
+        //so they are still on the broker. Restart the broker, prompting their recovery.
+        restartBroker();
+
+        Connection connection2 = getConnection();
+        connection2.start();
+
+        Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
+
+        TopicSubscriber noLocalSubscriber2 =
+                session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
+
+        // The NO-local subscriber should receive messages sent from connection3
+        received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
+        session2.commit();
+        assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
+
+        noLocalSubscriber2.close();
+
+
     }
 
     protected List<Message> receiveMessage(MessageConsumer messageConsumer,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org