You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/19 16:42:54 UTC

svn commit: r530432 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/queue/ cluster/src/main/java/org/apache/qpid/server/queue/ systests/src/main/java/org/apache/qpid/server...

Author: ritchiem
Date: Thu Apr 19 07:42:53 2007
New Revision: 530432

URL: http://svn.apache.org/viewvc?view=rev&rev=530432
Log:
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
AMQChannel remove comment around setPublisher - this is used by noLocal implementation.

Subscription - rename of hasFilters to filtersMessages
AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages
SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check.

TopicSessionTest - Additional testing for NoLocal to ensure.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Apr 19 07:42:53 2007
@@ -197,7 +197,6 @@
 
         _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
                                          _txnContext);
-        // TODO: used in clustering only I think (RG)
         _currentMessage.setPublisher(publisher);
     }
 

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Apr 19 07:42:53 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import java.text.MessageFormat;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@
 
     /**
      * Returns messages within the given range of message Ids
+     *
      * @param fromMessageId
      * @param toMessageId
+     *
      * @return List of messages
      */
     public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -253,6 +254,7 @@
 
     /**
      * @param messageId
+     *
      * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
      */
     public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@
 
     /**
      * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
-     * moving messages (stop the async delivery) - get all the messages available in the given message
-     * id range - setup the other queue for moving messages (stop the async delivery) - send these
-     * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
-     * remove messages from this queue - remove locks from both queues and start async delivery
+     * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
+     * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
+     * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
+     * locks from both queues and start async delivery
      *
      * @param fromMessageId
      * @param toMessageId
@@ -442,7 +444,7 @@
         Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
                                                                             filters, noLocal, this);
 
-        if (subscription.hasFilters())
+        if (subscription.filtersMessages())
         {
             if (_deliveryMgr.hasQueuedMessages())
             {
@@ -641,7 +643,7 @@
         {
             _totalMessagesReceived.incrementAndGet();
         }
-        
+
         try
         {
             _managedObject.checkForNotification(msg);

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Thu Apr 19 07:42:53 2007
@@ -32,7 +32,7 @@
 
     void queueDeleted(AMQQueue queue) throws AMQException;
 
-    boolean hasFilters();
+    boolean filtersMessages();
 
     boolean hasInterest(AMQMessage msg);
 

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Apr 19 07:42:53 2007
@@ -158,7 +158,7 @@
         }
 
 
-        if (_filters != null)
+        if (filtersMessages())
         {
             _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
         }
@@ -346,9 +346,9 @@
         channel.queueDeleted(queue);
     }
 
-    public boolean hasFilters()
+    public boolean filtersMessages()
     {
-        return _filters != null;
+        return _filters != null || _noLocal;
     }
 
     public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@
 //            return false;
         }
 
-        if (_noLocal)
+        final AMQProtocolSession publisher = msg.getPublisher();
+
+        //todo - client id should be recoreded and this test removed but handled below
+        if (_noLocal && publisher != null)
         {
             // We don't want local messages so check to see if message is one we sent
             Object localInstance;
@@ -372,8 +375,9 @@
             if ((protocolSession.getClientProperties() != null) &&
                 (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
             {
-                if ((msg.getPublisher().getClientProperties() != null) &&
-                    (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+
+                if ((publisher.getClientProperties() != null) &&
+                    (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
                 {
                     if (localInstance == msgInstance || localInstance.equals(msgInstance))
                     {
@@ -388,8 +392,11 @@
             }
             else
             {
+
                 localInstance = protocolSession.getClientIdentifier();
-                msgInstance = msg.getPublisher().getClientIdentifier();
+                //todo - client id should be recoreded and this test removed but handled here
+
+                msgInstance = publisher.getClientIdentifier();
                 if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
                 {
                     if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@
                     }
                     return false;
                 }
-
             }
 
 
@@ -623,7 +629,7 @@
             return _resendQueue;
         }
 
-        if (_filters != null)
+        if (filtersMessages())
         {
             if (isAutoClose())
             {

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Apr 19 07:42:53 2007
@@ -157,7 +157,7 @@
                     //FIXME the queue could be full of sent messages.
                     // Either need to clean all PDQs after sending a message
                     // OR have a clean up thread that runs the PDQs expunging the messages.
-                    if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+                    if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
                     {
                         return subscription;
                     }

Modified: incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Thu Apr 19 07:42:53 2007
@@ -102,7 +102,7 @@
         }
     }
 
-    public boolean hasFilters()
+    public boolean filtersMessages()
     {
         return false;
     }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Apr 19 07:42:53 2007
@@ -87,7 +87,7 @@
     {
     }
 
-    public boolean hasFilters()
+    public boolean filtersMessages()
     {
         return false;
     }