You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/08/09 00:56:17 UTC

svn commit: r1155138 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/subscription/ test/java/org/apache/qpid/server/exchange/ test/java/org/apache/qpid/server/queue/

Author: robbie
Date: Mon Aug  8 22:56:17 2011
New Revision: 1155138

URL: http://svn.apache.org/viewvc?rev=1155138&view=rev
Log:
QPID-3387: use the subscription ID to track rejection rather than the subscription itself

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Aug  8 22:56:17 2011
@@ -202,9 +202,7 @@ public interface QueueEntry extends Comp
 
     void reject();
 
-    void reject(Subscription subscription);
-
-    boolean isRejectedBy(Subscription subscription);
+    boolean isRejectedBy(long subscriptionId);
 
     void dequeue();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Aug  8 22:56:17 2011
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q
 
     private MessageReference _message;
 
-    private Set<Subscription> _rejectedBy = null;
+    private Set<Long> _rejectedBy = null;
 
     private volatile EntryState _state = AVAILABLE_STATE;
 
@@ -325,19 +325,16 @@ public class QueueEntryImpl implements Q
 
     public void reject()
     {
-        reject(getDeliveredSubscription());
-    }
+        Subscription subscription = getDeliveredSubscription();
 
-    public void reject(Subscription subscription)
-    {
         if (subscription != null)
         {
             if (_rejectedBy == null)
             {
-                _rejectedBy = new HashSet<Subscription>();
+                _rejectedBy = new HashSet<Long>();
             }
 
-            _rejectedBy.add(subscription);
+            _rejectedBy.add(subscription.getSubscriptionID());
         }
         else
         {
@@ -345,12 +342,12 @@ public class QueueEntryImpl implements Q
         }
     }
 
-    public boolean isRejectedBy(Subscription subscription)
+    public boolean isRejectedBy(long subscriptionId)
     {
 
         if (_rejectedBy != null) // We have subscriptions that rejected this message
         {
-            return _rejectedBy.contains(subscription);
+            return _rejectedBy.contains(subscriptionId);
         }
         else // This messasge hasn't been rejected yet.
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Aug  8 22:56:17 2011
@@ -475,7 +475,7 @@ public abstract class SubscriptionImpl i
 
 
         //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(getSubscriptionID()))
         {
             if (_logger.isDebugEnabled())
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Aug  8 22:56:17 2011
@@ -203,7 +203,7 @@ public class Subscription_0_10 implement
 
 
         //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(getSubscriptionID()))
         {
 
             return false;

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Aug  8 22:56:17 2011
@@ -428,21 +428,11 @@ public class AbstractHeadersExchangeTest
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void reject(Subscription subscription)
-                {
-                    //To change body of implemented methods use File | Settings | File Templates.
-                }
-
-                public boolean isRejectedBy(Subscription subscription)
+                public boolean isRejectedBy(long subscriptionId)
                 {
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void requeue(Subscription subscription) 
-                {
-                    //To change body of implemented methods use File | Settings | File Templates.
-                }
-
                 public void dequeue()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Aug  8 22:56:17 2011
@@ -139,7 +139,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean isRejectedBy(Subscription subscription)
+    public boolean isRejectedBy(long subscriptionId)
     {
 
         return false;
@@ -153,13 +153,6 @@ public class MockQueueEntry implements Q
     }
 
 
-    public void reject(Subscription subscription)
-    {
-
-
-    }
-
-
     public void release()
     {
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=1155138&r1=1155137&r2=1155138&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Mon Aug  8 22:56:17 2011
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry.EntryState;
 import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
 
 /**
  * Tests for {@link QueueEntryImpl}
@@ -210,4 +211,37 @@ public class QueueEntryImplTest extends 
         }
         return state;
     }
+
+    /**
+     * Tests rejecting a queue entry records the Subscription ID
+     * for later verification by isRejectedBy(subscriptionId).
+     */
+    public void testRejectAndRejectedBy()
+    {
+        Subscription sub = new MockSubscription();
+        long subId = sub.getSubscriptionID();
+
+        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+        assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+
+        //acquire, reject, and release the message using the subscription
+        assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
+        _queueEntry.reject();
+        _queueEntry.release();
+
+        //verify the rejection is recorded
+        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+
+        //repeat rejection using a second subscription
+        Subscription sub2 = new MockSubscription();
+        long sub2Id = sub2.getSubscriptionID();
+
+        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+        assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
+        _queueEntry.reject();
+
+        //verify it still records being rejected by both subscriptions
+        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org