You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/17 18:08:04 UTC
svn commit: r529659 - in
/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server:
AMQChannel.java handler/BasicRejectMethodHandler.java queue/AMQMessage.java
queue/ConcurrentSelectorDeliveryManager.java queue/SubscriptionImpl.java
Author: ritchiem
Date: Tue Apr 17 09:08:00 2007
New Revision: 529659
URL: http://svn.apache.org/viewvc?view=rev&rev=529659
Log:
QPID-454 Message 'taken' notion is per message. But should be per message per queue
AMQChannel - pass queue in on all take/release/getSubscriptionDelievered calls
BasicRejectMethodHandler - pass queue in on getSubscriptionDelievered calls
AMQMessage - Changes to require AMQQueue on all take/release/getSubscriptionDelievered calls
ConcurrentSelectorDeliveryManager - pass queue in on take/release/getSubscriptionDelievered calls
SubscriptionImpl - - pass queue in on release calls
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Apr 17 09:08:00 2007
@@ -472,7 +472,7 @@
if (unacked.queue != null)
{
// Ensure message is released for redelivery
- unacked.message.release();
+ unacked.message.release(unacked.queue);
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -503,7 +503,10 @@
{
// Ensure message is released for redelivery
- unacked.message.release();
+ if (unacked.queue != null)
+ {
+ unacked.message.release(unacked.queue);
+ }
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -672,14 +675,14 @@
// else
// {
//release to allow it to be delivered
- msg.release();
+ msg.release(message.queue);
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ Subscription sub = msg.getDeliveredSubscription(message.queue);
if (sub != null)
{
@@ -753,7 +756,7 @@
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- message.message.release();
+ message.message.release(message.queue);
message.message.setRedelivered(true);
deliveryContext.deliver(message.message, message.queue, true);
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Apr 17 09:08:00 2007
@@ -98,7 +98,7 @@
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
// if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription());
+ message.message.reject(message.message.getDeliveredSubscription(message.queue));
}
if (evt.getMethod().requeue)
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Apr 17 09:08:00 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,20 +80,19 @@
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
-
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
-
private Set<Subscription> _rejectedBy = null;
+ private Map<AMQQueue, AtomicBoolean> _takenMap;
+ private Map<AMQQueue, Subscription> _takenBySubcriptionMap;
- public boolean isTaken()
+ public boolean isTaken(AMQQueue queue)
{
- return _taken.get();
+ return _takenMap.get(queue).get();
}
private final int hashcode = System.identityHashCode(this);
+
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -202,10 +204,12 @@
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+ _takenMap = null;
+ _takenBySubcriptionMap = null;
+
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
}
}
@@ -318,8 +322,15 @@
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
+
+ int mapSize = _transientMessageData.getDestinationQueues().size();
+
+ _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize);
+ _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize);
+
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
+ _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -356,12 +367,13 @@
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
*/
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
- return this;
+ return this;
}
/** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +390,10 @@
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
+ * @param storeContext
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
- * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -451,27 +464,33 @@
}
- public boolean taken(Subscription sub)
+ public boolean taken(AMQQueue queue, Subscription sub)
{
- if (_taken.getAndSet(true))
+ synchronized (queue)
{
- return true;
- }
- else
- {
- _takenBySubcription = sub;
- return false;
+ if (_takenMap.get(queue).getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
- public void release()
+ public void release(AMQQueue queue)
{
if (_log.isTraceEnabled())
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+ synchronized (queue)
+ {
+ _takenMap.get(queue).set(false);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -600,7 +619,7 @@
for (AMQQueue q : destinationQueues)
{
//Increment the references to this message for each queue delivery.
- incrementReference();
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -824,13 +843,13 @@
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken + " by:" + _takenBySubcription;
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
- public Subscription getDeliveredSubscription()
+ public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+ return _takenBySubcriptionMap.get(queue);
}
public void reject(Subscription subscription)
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Apr 17 09:08:00 2007
@@ -210,6 +210,7 @@
/**
* Returns all the messages in the Queue
+ *
* @return List of messages
*/
public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@
list.add(message);
}
_lock.unlock();
-
+
return list;
}
/**
* Returns messages within the range of given messageIds
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return
*/
public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@
long maxMessageCount = toMessageId - fromMessageId + 1;
_lock.lock();
-
+
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
-
+
AMQMessage message = _messages.poll();
if (message != null)
{
@@ -432,9 +435,7 @@
return count;
}
- /**
- This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
- */
+ /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -445,7 +446,7 @@
AMQMessage message = messages.peek();
//while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
- while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+ while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(_queue, sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
@@ -562,7 +563,7 @@
}
catch (AMQException e)
{
- message.release();
+ message.release(_queue);
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -723,7 +724,7 @@
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(s);
+ msg.taken(_queue, s);
//Deliver the message
s.send(msg, _queue);
}
@@ -737,7 +738,7 @@
}
}
- if (!msg.isTaken())
+ if (!msg.isTaken(_queue))
{
if (_log.isInfoEnabled())
{
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=529659&r1=529658&r2=529659
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Apr 17 09:08:00 2007
@@ -558,7 +558,7 @@
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release();
+ resent.release(_queue);
_queue.subscriberHasPendingResend(false, this, resent);
try