You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/18 16:37:30 UTC
svn commit: r530037 -
/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Author: ritchiem
Date: Wed Apr 18 07:37:30 2007
New Revision: 530037
URL: http://svn.apache.org/viewvc?view=rev&rev=530037
Log:
QPID-454 Message 'taken' notion is per message. REVERTED as it just wasn't right.. needs to be refactored.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=530037&r1=530036&r2=530037
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Apr 18 07:37:30 2007
@@ -80,15 +80,17 @@
*/
private boolean _immediate;
+ private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
+ private Subscription _takenBySubcription;
private Set<Subscription> _rejectedBy = null;
- private Map<AMQQueue, AtomicBoolean> _takenMap;
- private Map<AMQQueue, Subscription> _takenBySubcriptionMap;
+ private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
+ private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
public boolean isTaken(AMQQueue queue)
{
- return _takenMap.get(queue).get();
+ return _taken.get();
}
private final int hashcode = System.identityHashCode(this);
@@ -204,8 +206,7 @@
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _takenMap = null;
- _takenBySubcriptionMap = null;
+ _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -323,11 +324,6 @@
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- int mapSize = _transientMessageData.getDestinationQueues().size();
-
- _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize);
- _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize);
-
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
_takenMap.put(q, new AtomicBoolean(false));
@@ -466,17 +462,14 @@
public boolean taken(AMQQueue queue, Subscription sub)
{
- synchronized (queue)
+ if (_taken.getAndSet(true))
{
- if (_takenMap.get(queue).getAndSet(true))
- {
- return true;
- }
- else
- {
- _takenBySubcriptionMap.put(queue, sub);
- return false;
- }
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
}
}
@@ -486,11 +479,8 @@
{
_log.trace("Releasing Message:" + debugIdentity());
}
- synchronized (queue)
- {
- _takenMap.get(queue).set(false);
- _takenBySubcriptionMap.put(queue, null);
- }
+ _taken.set(false);
+ _takenBySubcription = null;
}
public boolean checkToken(Object token)
@@ -843,13 +833,16 @@
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+ _taken + " by :" + _takenBySubcription;
+
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcriptionMap.get(queue);
+ return _takenBySubcription;
}
public void reject(Subscription subscription)