You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 11:56:18 UTC

svn commit: r556869 - in /incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue: AMQMessage.java AMQQueue.java ConcurrentSelectorDeliveryManager.java

Author: ritchiem
Date: Tue Jul 17 02:56:17 2007
New Revision: 556869

URL: http://svn.apache.org/viewvc?view=rev&rev=556869
Log:
QPID-540 Prevent NPE when purging message from the main _message queue in the ConcurrentSelectorDeliveryManager that have been delivered via a Subscribers _messageQueue. Ensuring that any expired messages are still correctly handled. i.e. the Queue size/depth is reduced and the message correctly dequeued from the underlying store.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jul 17 02:56:17 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
@@ -98,9 +97,9 @@
     public void setExpiration()
     {
         long expiration =
-            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
         long timestamp =
-            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+                ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
 
         if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
         {
@@ -163,8 +162,8 @@
             {
 
                 AMQBody cb =
-                    getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
-                            _messageId, ++_index));
+                        getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+                                                                                                         _messageId, ++_index));
 
                 return new AMQFrame(_channel, cb);
             }
@@ -250,7 +249,7 @@
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
-        throws AMQException
+            throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -267,7 +266,7 @@
      * @param contentHeader
      */
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-        ContentHeaderBody contentHeader) throws AMQException
+                      ContentHeaderBody contentHeader) throws AMQException
     {
         this(messageId, info, txnContext);
         setContentHeaderBody(contentHeader);
@@ -286,8 +285,8 @@
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
-        ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
-        MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+                      ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+                      MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
     {
         this(messageId, info, txnContext, contentHeader);
         _transientMessageData.setDestinationQueues(destinationQueues);
@@ -335,7 +334,7 @@
     }
 
     public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
-        throws AMQException
+            throws AMQException
     {
         final boolean persistent = isPersistent();
         _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
@@ -451,7 +450,7 @@
             if (count < 0)
             {
                 throw new MessageCleanupException("Reference count for message id " + debugIdentity()
-                    + " has gone below 0.");
+                                                  + " has gone below 0.");
             }
         }
     }
@@ -668,12 +667,7 @@
         {
             long now = System.currentTimeMillis();
 
-            if (now > _expiration)
-            {
-                dequeue(storecontext, queue);
-
-                return true;
-            }
+            return (now > _expiration);
         }
 
         return false;
@@ -700,7 +694,7 @@
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
             _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
-                _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
+                                                          _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
@@ -936,7 +930,7 @@
         // _taken + " by :" + _takenBySubcription;
 
         return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
-            + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+               + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Jul 17 02:56:17 2007
@@ -1,18 +1,7 @@
 /* Copyright Rupert Smith, 2005 to 2007, all rights reserved. */
 package org.apache.qpid.server.queue;
 
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.JMException;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
@@ -26,6 +15,15 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import javax.management.JMException;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
  * fully in RFC 006.
@@ -138,11 +136,11 @@
 
     public int compareTo(Object o)
     {
-        return _name.compareTo(((AMQQueue)o).getName());
+        return _name.compareTo(((AMQQueue) o).getName());
     }
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-             throws AMQException
+            throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
              new SubscriptionSet(), new SubscriptionImpl.Factory());
@@ -422,6 +420,70 @@
         }
     }
 
+    /**
+     * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
+     * on the queues being moved between is suspended during the remove.
+     *
+     * @param fromMessageId The first message id to move.
+     * @param toMessageId   The last message id to move.
+     * @param storeContext  The context of the message store under which to perform the move. This is associated with
+     *                      the stores transactional context.
+     */
+    public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+    {
+        MessageStore fromStore = getVirtualHost().getMessageStore();
+
+        try
+        {
+            // Obtain locks to prevent activity on the queues being moved between.
+            startMovingMessages();
+
+            // Get the list of messages to move.
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+
+            try
+            {
+                fromStore.beginTran(storeContext);
+
+                // remove the messages in on the message store.
+                for (AMQMessage message : foundMessagesList)
+                {
+                    fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
+                }
+
+                // Commit and flush the move transcations.
+                try
+                {
+                    fromStore.commitTran(storeContext);
+                }
+                catch (AMQException e)
+                {
+                    throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
+                }
+
+                // remove the messages on the in-memory queues.
+                _deliveryMgr.removeMovedMessages(foundMessagesList);
+            }
+            // Abort the move transactions on move failures.
+            catch (AMQException e)
+            {
+                try
+                {
+                    fromStore.abortTran(storeContext);
+                }
+                catch (AMQException ae)
+                {
+                    throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
+                }
+            }
+        }
+        // Release locks to allow activity on the queues being moved between to continue.
+        finally
+        {
+            stopMovingMessages();
+        }
+    }
+
     public void startMovingMessages()
     {
         _deliveryMgr.startMovingMessages();
@@ -560,7 +622,7 @@
         }
 
         Subscription subscription =
-            _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+                _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
 
         if (subscription.filtersMessages())
         {
@@ -598,14 +660,14 @@
         if (_logger.isDebugEnabled())
         {
             _logger.debug(MessageFormat.format(
-                              "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
-                              ps, channel, consumerTag, this));
+                    "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
+                    ps, channel, consumerTag, this));
         }
 
         Subscription removedSubscription;
         if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
                                                                                                          consumerTag)))
-                == null)
+            == null)
         {
             throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
                                    + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
@@ -787,7 +849,7 @@
             return false;
         }
 
-        final AMQQueue amqQueue = (AMQQueue)o;
+        final AMQQueue amqQueue = (AMQQueue) o;
 
         return (_name.equals(amqQueue._name));
     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=556869&r1=556868&r2=556869
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Jul 17 02:56:17 2007
@@ -20,19 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
@@ -42,8 +29,21 @@
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.util.MessageQueue;
 import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 /** Manages delivery of messages on behalf of a queue */
@@ -453,12 +453,29 @@
         //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
         while (purgeMessage(message, sub))
         {
+            // if we are purging then ensure we mark this message taken for the current subscriber
+            // the current subscriber may be null in the case of a get or a purge but this is ok.
+//            boolean alreadyTaken = message.taken(_queue, sub);
+
             //remove the already taken message or expired
             AMQMessage removed = messages.poll();
 
             assert removed == message;
 
-            _totalMessageSize.addAndGet(-message.getSize());
+            // if the message expired then the _totalMessageSize needs adjusting
+            if (message.expired(sub.getChannel().getStoreContext(), _queue))
+            {
+                _totalMessageSize.addAndGet(-message.getSize());
+
+                message.dequeue(sub.getChannel().getStoreContext(), _queue);
+
+                if (_log.isInfoEnabled())
+                {
+                    _log.info(debugIdentity() + " Doing clean up of the main _message queue.");
+                }
+            }
+            //else the clean up is not required as the message has already been taken for this queue therefore
+            // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
 
             if (_log.isTraceEnabled())
             {
@@ -473,7 +490,10 @@
     }
 
     /**
-     * 
+     *  This method will return true if the message is to be purged from the queue.
+     *
+     *
+     *  SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
      * @param message
      * @param sub
      * @return
@@ -606,7 +626,10 @@
             {
                 if (_log.isInfoEnabled())
                 {
-                    _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+                    //fixme - we should do the clean up as the message remains on the _message queue
+                    // this is resulting in the next consumer receiving the message and then attempting to purge it
+                    //
+                    _log.info(debugIdentity() + "We should do clean up of the main _message queue here");
                 }
             }
 
@@ -800,7 +823,7 @@
                         if (debugEnabled)
                         {
                             _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
-                                      "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+                                       "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
                         }
                     }
                 }
@@ -810,7 +833,7 @@
                     if (debugEnabled)
                     {
                         _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
-                                  " Subscriber:" + System.identityHashCode(s));
+                                   " Subscriber:" + System.identityHashCode(s));
                     }
 
                     deliver(context, name, msg, deliverFirst);