You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 11:56:18 UTC
svn commit: r556869 - in
/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue:
AMQMessage.java AMQQueue.java ConcurrentSelectorDeliveryManager.java
Author: ritchiem
Date: Tue Jul 17 02:56:17 2007
New Revision: 556869
URL: http://svn.apache.org/viewvc?view=rev&rev=556869
Log:
QPID-540 Prevent NPE when purging message from the main _message queue in the ConcurrentSelectorDeliveryManager that have been delivered via a Subscribers _messageQueue. Ensuring that any expired messages are still correctly handled. i.e. the Queue size/depth is reduced and the message correctly dequeued from the underlying store.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jul 17 02:56:17 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -98,9 +97,9 @@
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
{
@@ -163,8 +162,8 @@
{
AMQBody cb =
- getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- _messageId, ++_index));
+ getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
return new AMQFrame(_channel, cb);
}
@@ -250,7 +249,7 @@
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
- throws AMQException
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -267,7 +266,7 @@
* @param contentHeader
*/
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader) throws AMQException
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -286,8 +285,8 @@
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
- MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -335,7 +334,7 @@
}
public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
- throws AMQException
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(_messageId, store, persistent);
@@ -451,7 +450,7 @@
if (count < 0)
{
throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
+ + " has gone below 0.");
}
}
}
@@ -668,12 +667,7 @@
{
long now = System.currentTimeMillis();
- if (now > _expiration)
- {
- dequeue(storecontext, queue);
-
- return true;
- }
+ return (now > _expiration);
}
return false;
@@ -700,7 +694,7 @@
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
- _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
+ _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with the message content
// now that it has all been received, before we attempt delivery
@@ -936,7 +930,7 @@
// _taken + " by :" + _takenBySubcription;
return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
- + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jul 17 02:56:17 2007
@@ -1,18 +1,7 @@
/* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */
package org.apache.qpid.server.queue;
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
@@ -26,6 +15,15 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.JMException;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -138,11 +136,11 @@
public int compareTo(Object o)
{
- return _name.compareTo(((AMQQueue)o).getName());
+ return _name.compareTo(((AMQQueue) o).getName());
}
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
+ throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
new SubscriptionSet(), new SubscriptionImpl.Factory());
@@ -422,6 +420,70 @@
}
}
+ /**
+ * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+ * on the queues being moved between is suspended during the remove.
+ *
+ * @param fromMessageId The first message id to move.
+ * @param toMessageId The last message id to move.
+ * @param storeContext The context of the message store under which to perform the move. This is associated with
+ * the stores transactional context.
+ */
+ public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ {
+ MessageStore fromStore = getVirtualHost().getMessageStore();
+
+ try
+ {
+ // Obtain locks to prevent activity on the queues being moved between.
+ startMovingMessages();
+
+ // Get the list of messages to move.
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+ try
+ {
+ fromStore.beginTran(storeContext);
+
+ // remove the messages in on the message store.
+ for (AMQMessage message : foundMessagesList)
+ {
+ fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ fromStore.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+ }
+
+ // remove the messages on the in-memory queues.
+ _deliveryMgr.removeMovedMessages(foundMessagesList);
+ }
+ // Abort the move transactions on move failures.
+ catch (AMQException e)
+ {
+ try
+ {
+ fromStore.abortTran(storeContext);
+ }
+ catch (AMQException ae)
+ {
+ throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+ }
+ }
+ }
+ // Release locks to allow activity on the queues being moved between to continue.
+ finally
+ {
+ stopMovingMessages();
+ }
+ }
+
public void startMovingMessages()
{
_deliveryMgr.startMovingMessages();
@@ -560,7 +622,7 @@
}
Subscription subscription =
- _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+ _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
if (subscription.filtersMessages())
{
@@ -598,14 +660,14 @@
if (_logger.isDebugEnabled())
{
_logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
- ps, channel, consumerTag, this));
+ "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this));
}
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
consumerTag)))
- == null)
+ == null)
{
throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
+ " and protocol session key " + ps.getKey() + " not registered with queue " + this);
@@ -787,7 +849,7 @@
return false;
}
- final AMQQueue amqQueue = (AMQQueue)o;
+ final AMQQueue amqQueue = (AMQQueue) o;
return (_name.equals(amqQueue._name));
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Jul 17 02:56:17 2007
@@ -20,19 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
@@ -42,8 +29,21 @@
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.MessageQueue;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
/** Manages delivery of messages on behalf of a queue */
@@ -453,12 +453,29 @@
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
while (purgeMessage(message, sub))
{
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+// boolean alreadyTaken = message.taken(_queue, sub);
+
//remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
- _totalMessageSize.addAndGet(-message.getSize());
+ // if the message expired then the _totalMessageSize needs adjusting
+ if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+
+ message.dequeue(sub.getChannel().getStoreContext(), _queue);
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
+ }
+ }
+ //else the clean up is not required as the message has already been taken for this queue therefore
+ // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
if (_log.isTraceEnabled())
{
@@ -473,7 +490,10 @@
}
/**
- *
+ * This method will return true if the message is to be purged from the queue.
+ *
+ *
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
* @param message
* @param sub
* @return
@@ -606,7 +626,10 @@
{
if (_log.isInfoEnabled())
{
- _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+ //fixme - we should do the clean up as the message remains on the _message queue
+ // this is resulting in the next consumer receiving the message and then attempting to purge it
+ //
+ _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
}
}
@@ -800,7 +823,7 @@
if (debugEnabled)
{
_log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+ "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
}
}
}
@@ -810,7 +833,7 @@
if (debugEnabled)
{
_log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
- " Subscriber:" + System.identityHashCode(s));
+ " Subscriber:" + System.identityHashCode(s));
}
deliver(context, name, msg, deliverFirst);