You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/18 16:37:30 UTC

svn commit: r530037 - /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java

Author: ritchiem
Date: Wed Apr 18 07:37:30 2007
New Revision: 530037

URL: http://svn.apache.org/viewvc?view=rev&rev=530037
Log:
QPID-454 Message 'taken' notion is per message. REVERTED as it just wasn't right.. needs to be refactored.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530037&r1=530036&r2=530037
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Apr 18 07:37:30 2007
@@ -80,15 +80,17 @@
      */
     private boolean _immediate;
 
+    private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
+    private Subscription _takenBySubcription;
     private Set<Subscription> _rejectedBy = null;
-    private Map<AMQQueue, AtomicBoolean> _takenMap;
-    private Map<AMQQueue, Subscription> _takenBySubcriptionMap;
+    private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+    private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
     public boolean isTaken(AMQQueue queue)
     {
-        return _takenMap.get(queue).get();
+        return _taken.get();
     }
 
     private final int hashcode = System.identityHashCode(this);
@@ -204,8 +206,7 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-        _takenMap = null;
-        _takenBySubcriptionMap = null;
+        _taken = new AtomicBoolean(false);
 
         if (_log.isDebugEnabled())
         {
@@ -323,11 +324,6 @@
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
 
-        int mapSize = _transientMessageData.getDestinationQueues().size();
-
-        _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize);
-        _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize);
-
         for (AMQQueue q : _transientMessageData.getDestinationQueues())
         {
             _takenMap.put(q, new AtomicBoolean(false));
@@ -466,17 +462,14 @@
 
     public boolean taken(AMQQueue queue, Subscription sub)
     {
-        synchronized (queue)
+        if (_taken.getAndSet(true))
         {
-            if (_takenMap.get(queue).getAndSet(true))
-            {
-                return true;
-            }
-            else
-            {
-                _takenBySubcriptionMap.put(queue, sub);
-                return false;
-            }
+            return true;
+        }
+        else
+        {
+            _takenBySubcription = sub;
+            return false;
         }
     }
 
@@ -486,11 +479,8 @@
         {
             _log.trace("Releasing Message:" + debugIdentity());
         }
-        synchronized (queue)
-        {
-            _takenMap.get(queue).set(false);
-            _takenBySubcriptionMap.put(queue, null);
-        }
+        _taken.set(false);
+        _takenBySubcription = null;
     }
 
     public boolean checkToken(Object token)
@@ -843,13 +833,16 @@
 
     public String toString()
     {
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+               _taken + " by :" + _takenBySubcription;
+
+//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+//               _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-        return _takenBySubcriptionMap.get(queue);
+        return _takenBySubcription;
     }
 
     public void reject(Subscription subscription)