You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/17 18:08:04 UTC

svn commit: r529659 - in /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server: AMQChannel.java handler/BasicRejectMethodHandler.java queue/AMQMessage.java queue/ConcurrentSelectorDeliveryManager.java queue/SubscriptionImpl.java

Author: ritchiem
Date: Tue Apr 17 09:08:00 2007
New Revision: 529659

URL: http://svn.apache.org/viewvc?view=rev&rev=529659
Log:
QPID-454 Message 'taken' notion is per message. But should be per message per queue
AMQChannel - pass queue in on all take/release/getSubscriptionDelievered calls
BasicRejectMethodHandler - pass queue in on getSubscriptionDelievered calls
AMQMessage - Changes to require AMQQueue on all take/release/getSubscriptionDelievered calls
ConcurrentSelectorDeliveryManager - pass queue in on take/release/getSubscriptionDelievered calls
SubscriptionImpl - - pass queue in on release calls

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Apr 17 09:08:00 2007
@@ -472,7 +472,7 @@
             if (unacked.queue != null)
             {
                 // Ensure message is released for redelivery
-                unacked.message.release();
+                unacked.message.release(unacked.queue);
 
                 // Mark message redelivered
                 unacked.message.setRedelivered(true);
@@ -503,7 +503,10 @@
         {
 
             // Ensure message is released for redelivery
-            unacked.message.release();
+            if (unacked.queue != null)
+            {
+                unacked.message.release(unacked.queue);
+            }
 
             // Mark message redelivered
             unacked.message.setRedelivered(true);
@@ -672,14 +675,14 @@
 //            else
 //            {
             //release to allow it to be delivered
-            msg.release();
+            msg.release(message.queue);
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             msg.setRedelivered(true);
 
 
-            Subscription sub = msg.getDeliveredSubscription();
+            Subscription sub = msg.getDeliveredSubscription(message.queue);
 
             if (sub != null)
             {
@@ -753,7 +756,7 @@
         // Process Messages to Requeue at the front of the queue
         for (UnacknowledgedMessage message : msgToRequeue)
         {
-            message.message.release();
+            message.message.release(message.queue);
             message.message.setRedelivered(true);
 
             deliveryContext.deliver(message.message, message.queue, true);

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Apr 17 09:08:00 2007
@@ -98,7 +98,7 @@
             // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
 //            if (!evt.getMethod().resend)
             {
-                message.message.reject(message.message.getDeliveredSubscription());
+                message.message.reject(message.message.getDeliveredSubscription(message.queue));
             }
 
             if (evt.getMethod().requeue)

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Apr 17 09:08:00 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -77,20 +80,19 @@
      */
     private boolean _immediate;
 
-    private AtomicBoolean _taken = new AtomicBoolean(false);
-
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-    private Subscription _takenBySubcription;
-
     private Set<Subscription> _rejectedBy = null;
+    private Map<AMQQueue, AtomicBoolean> _takenMap;
+    private Map<AMQQueue, Subscription> _takenBySubcriptionMap;
 
-    public boolean isTaken()
+    public boolean isTaken(AMQQueue queue)
     {
-        return _taken.get();
+        return _takenMap.get(queue).get();
     }
 
     private final int hashcode = System.identityHashCode(this);
+
     public String debugIdentity()
     {
         return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -202,10 +204,12 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-        _taken = new AtomicBoolean(false);
+        _takenMap = null;
+        _takenBySubcriptionMap = null;
+
         if (_log.isDebugEnabled())
         {
-            _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+            _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
         }
     }
 
@@ -318,8 +322,15 @@
 
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
+
+        int mapSize = _transientMessageData.getDestinationQueues().size();
+
+        _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize);
+        _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize);
+
         for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
+            _takenMap.put(q, new AtomicBoolean(false));
             _messageHandle.enqueue(storeContext, _messageId, q);
         }
 
@@ -356,12 +367,13 @@
     }
 
     /**
-     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+     * operation.
      */
     public AMQMessage takeReference()
     {
         _referenceCount.incrementAndGet();
-	return this;
+        return this;
     }
 
     /** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +390,10 @@
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
      *
+     * @param storeContext
+     *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
-     * @param storeContext
      */
     public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
@@ -451,27 +464,33 @@
     }
 
 
-    public boolean taken(Subscription sub)
+    public boolean taken(AMQQueue queue, Subscription sub)
     {
-        if (_taken.getAndSet(true))
+        synchronized (queue)
         {
-            return true;
-        }
-        else
-        {
-            _takenBySubcription = sub;
-            return false;
+            if (_takenMap.get(queue).getAndSet(true))
+            {
+                return true;
+            }
+            else
+            {
+                _takenBySubcriptionMap.put(queue, sub);
+                return false;
+            }
         }
     }
 
-    public void release()
+    public void release(AMQQueue queue)
     {
         if (_log.isTraceEnabled())
         {
             _log.trace("Releasing Message:" + debugIdentity());
         }
-        _taken.set(false);
-        _takenBySubcription = null;
+        synchronized (queue)
+        {
+            _takenMap.get(queue).set(false);
+            _takenBySubcriptionMap.put(queue, null);
+        }
     }
 
     public boolean checkToken(Object token)
@@ -600,7 +619,7 @@
             for (AMQQueue q : destinationQueues)
             {
                 //Increment the references to this message for each queue delivery.
-                incrementReference();                
+                incrementReference();
                 //normal deliver so add this message at the end.
                 _txnContext.deliver(this, q, false);
             }
@@ -824,13 +843,13 @@
 
     public String toString()
     {
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
-               _taken + " by:" + _takenBySubcription;
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
-    public Subscription getDeliveredSubscription()
+    public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-        return _takenBySubcription;
+        return _takenBySubcriptionMap.get(queue);
     }
 
     public void reject(Subscription subscription)

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Apr 17 09:08:00 2007
@@ -210,6 +210,7 @@
 
     /**
      * Returns all the messages in the Queue
+     *
      * @return List of messages
      */
     public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@
             list.add(message);
         }
         _lock.unlock();
-        
+
         return list;
     }
 
     /**
      * Returns messages within the range of given messageIds
+     *
      * @param fromMessageId
      * @param toMessageId
+     *
      * @return
      */
     public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@
         long maxMessageCount = toMessageId - fromMessageId + 1;
 
         _lock.lock();
-        
+
         List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
 
         for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@
     public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _lock.lock();
-        
+
         AMQMessage message = _messages.poll();
         if (message != null)
         {
@@ -432,9 +435,7 @@
         return count;
     }
 
-    /**
-        This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. 
-     */
+    /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
     private AMQMessage getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null);
@@ -445,7 +446,7 @@
         AMQMessage message = messages.peek();
 
         //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
-        while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+        while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(_queue, sub))
         {
             //remove the already taken message
             AMQMessage removed = messages.poll();
@@ -562,7 +563,7 @@
         }
         catch (AMQException e)
         {
-            message.release();
+            message.release(_queue);
             _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
         }
     }
@@ -723,7 +724,7 @@
                             _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(s);
+                        msg.taken(_queue, s);
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -737,7 +738,7 @@
                     }
                 }
 
-                if (!msg.isTaken())
+                if (!msg.isTaken(_queue))
                 {
                     if (_log.isInfoEnabled())
                     {

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Apr 17 09:08:00 2007
@@ -558,7 +558,7 @@
                     _logger.trace("Removed for resending:" + resent.debugIdentity());
                 }
 
-                resent.release();
+                resent.release(_queue);
                 _queue.subscriberHasPendingResend(false, this, resent);
 
                 try