You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/19 17:10:11 UTC

svn commit: r530444 - /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java

Author: ritchiem
Date: Thu Apr 19 08:10:10 2007
New Revision: 530444

URL: http://svn.apache.org/viewvc?view=rev&rev=530444
Log:
QPID-454 - Message 'taken' notion is per message. Adjusted to be per message per queue.

Previous commit was not sufficiently tested and other bugs were causing problems that were not related to this change.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530444&r1=530443&r2=530444
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Apr 19 08:10:10 2007
@@ -80,18 +80,17 @@
      */
     private boolean _immediate;
 
-    private AtomicBoolean _taken = new AtomicBoolean(false);
+    //    private Subscription _takenBySubcription;
+    //    private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-    private Subscription _takenBySubcription;
+
     private Set<Subscription> _rejectedBy = null;
+
+
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
-    public boolean isTaken(AMQQueue queue)
-    {
-        return _taken.get();
-    }
 
     private final int hashcode = System.identityHashCode(this);
 
@@ -206,7 +205,7 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-        _taken = new AtomicBoolean(false);
+//        _taken = new AtomicBoolean(false);
 
         if (_log.isDebugEnabled())
         {
@@ -326,7 +325,6 @@
 
         for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
-            _takenMap.put(q, new AtomicBoolean(false));
             _messageHandle.enqueue(storeContext, _messageId, q);
         }
 
@@ -459,17 +457,53 @@
         return _deliveredToConsumer;
     }
 
-
-    public boolean taken(AMQQueue queue, Subscription sub)
+    public boolean isTaken(AMQQueue queue)
     {
-        if (_taken.getAndSet(true))
+        //return _taken.get();
+
+        synchronized (this)
         {
-            return true;
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+                _takenMap.put(queue, taken);
+            }
+
+            return taken.get();
         }
-        else
+    }
+
+    public boolean taken(AMQQueue queue, Subscription sub)
+    {
+//        if (_taken.getAndSet(true))
+//        {
+//            return true;
+//        }
+//        else
+//        {
+//            _takenBySubcription = sub;
+//            return false;
+//        }
+
+        synchronized (this)
         {
-            _takenBySubcription = sub;
-            return false;
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+            }
+
+            if (taken.getAndSet(true))
+            {
+                return true;
+            }
+            else
+            {
+                _takenMap.put(queue, taken);
+                _takenBySubcriptionMap.put(queue, sub);
+                return false;
+            }
         }
     }
 
@@ -479,8 +513,26 @@
         {
             _log.trace("Releasing Message:" + debugIdentity());
         }
-        _taken.set(false);
-        _takenBySubcription = null;
+
+//        _taken.set(false);
+//        _takenBySubcription = null;
+
+
+        synchronized (this)
+        {
+            AtomicBoolean taken = _takenMap.get(queue);
+            if (taken == null)
+            {
+                taken = new AtomicBoolean(false);
+            }
+            else
+            {
+                taken.set(false);
+            }
+
+            _takenMap.put(queue, taken);
+            _takenBySubcriptionMap.put(queue, null);
+        }
     }
 
     public boolean checkToken(Object token)
@@ -833,16 +885,20 @@
 
     public String toString()
     {
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-               _taken + " by :" + _takenBySubcription;
+//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+//               _taken + " by :" + _takenBySubcription;
 
-//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-//               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-        return _takenBySubcription;
+//        return _takenBySubcription;
+        synchronized (this)
+        {
+            return _takenBySubcriptionMap.get(queue);
+        }
     }
 
     public void reject(Subscription subscription)