You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/02/09 13:29:15 UTC

svn commit: r505268 - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue: AMQQueue.java AMQQueueMBean.java ConcurrentSelectorDeliveryManager.java DeliveryManager.java

Author: bhupendrab
Date: Fri Feb  9 04:29:14 2007
New Revision: 505268

URL: http://svn.apache.org/viewvc?view=rev&rev=505268
Log:
QPID-170
predelivery queues will also be cleared with moved messages. Messages will be moved to another queue and predelivery queues of subsribers of another queue will also be populated.
the features - removeMmessageFromTop and clearQueue is also modified by using the getNextMessage

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb  9 04:29:14 2007
@@ -41,6 +41,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
@@ -157,7 +158,7 @@
     /**
      * total messages received by the queue since startup.
      */
-    public long _totalMessagesReceived = 0;
+    public AtomicLong _totalMessagesReceived = new AtomicLong();
 
     public int compareTo(Object o)
     {
@@ -291,59 +292,77 @@
     }
 
     /**
-     * @see ManagedQueue#moveMessages
+     * moves messages from this queue to another queue. to do this the approach is following-
+     * - setup the queue for moving messages (hold the lock and stop the async delivery)
+     * - get all the messages available in the given message id range
+     * - setup the other queue for moving messages (hold the lock and stop the async delivery)
+     * - send these available messages to the other queue (enqueue in other queue)
+     * - Once sending to other Queue is successful, remove messages from this queue
+     * - remove locks from both queues and start async delivery
      * @param fromMessageId
      * @param toMessageId
      * @param queueName
      * @param storeContext
-     * @throws AMQException
      */
     public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        StoreContext storeContext) throws AMQException
+                                                        StoreContext storeContext)
     {
+        // prepare the delivery manager for moving messages by stopping the async delivery and creating a lock
         AMQQueue anotherQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        List<AMQMessage> list = getMessagesOnTheQueue();
-        List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
-        int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
-        for (AMQMessage message : list)
+        try
         {
-            long msgId = message.getMessageId();
-            if (msgId >= fromMessageId && msgId <= toMessageId)
-            {
-                foundMessagesList.add(message);
-            }
-            // break the loop as soon as messages to be removed are found
-            if (foundMessagesList.size() == maxMessageCountToBeMoved)
+            startMovingMessages();
+            List<AMQMessage> list = getMessagesOnTheQueue();
+            List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+            int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+
+            // Run this loop till you find all the messages or the list has no more messages
+            for (AMQMessage message : list)
             {
-                break;
+                long msgId = message.getMessageId();
+                if (msgId >= fromMessageId && msgId <= toMessageId)
+                {
+                    foundMessagesList.add(message);
+                }
+                // break the loop as soon as messages to be removed are found
+                if (foundMessagesList.size() == maxMessageCountToBeMoved)
+                {
+                    break;
+                }
             }
-        }
 
-        // move messages to another queue
-        for (AMQMessage message : foundMessagesList)
+            // move messages to another queue
+            anotherQueue.startMovingMessages();
+            anotherQueue.enqueueMovedMessages(storeContext, foundMessagesList);
+
+            // moving is successful, now remove from original queue
+            _deliveryMgr.removeMovedMessages(foundMessagesList);
+        }
+        finally
         {
-            try
-            {
-                anotherQueue.process(storeContext, message);
-            }
-            catch(AMQException ex)
-            {
-                foundMessagesList.subList(foundMessagesList.indexOf(message), foundMessagesList.size()).clear();
-                // Exception occured, so rollback the changes
-                anotherQueue.removeMessages(foundMessagesList);
-                throw ex;
-            }
+            // remove the lock and start the async delivery
+            anotherQueue.stopMovingMessages();
+            stopMovingMessages();   
         }
+    }
 
-        // moving is successful, now remove from original queue
-        removeMessages(foundMessagesList);
+    public void startMovingMessages()
+    {
+        _deliveryMgr.startMovingMessages();
     }
 
-    public synchronized void removeMessages(List<AMQMessage> messageList)
+    private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
     {
-        _deliveryMgr.removeMessages(messageList);
+        _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
+        _totalMessagesReceived.addAndGet(messageList.size());
     }
 
+    public void stopMovingMessages()
+    {
+        _deliveryMgr.stopMovingMessages();
+        _deliveryMgr.processAsync(_asyncDelivery);
+    }
+    
     /**
      * @return MBean object associated with this Queue
      */
@@ -374,7 +393,7 @@
 
     public long getReceivedMessageCount()
     {
-        return _totalMessagesReceived;
+        return _totalMessagesReceived.get();
     }
 
     public int getMaximumMessageCount()
@@ -407,7 +426,7 @@
     /**
      * Removes the AMQMessage from the top of the queue.
      */
-    public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+    public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop(storeContext);
     }
@@ -415,7 +434,7 @@
     /**
      * removes all the messages from the queue.
      */
-    public long clearQueue(StoreContext storeContext) throws AMQException
+    public synchronized long clearQueue(StoreContext storeContext) throws AMQException
     {
         return _deliveryMgr.clearAllMessages(storeContext);
     }
@@ -633,7 +652,7 @@
 
     protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
     {
-        _totalMessagesReceived++;
+        _totalMessagesReceived.incrementAndGet();
         try
         {
             _managedObject.checkForNotification(msg);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri Feb  9 04:29:14 2007
@@ -395,24 +395,9 @@
         {
             throw new OperationsException("\"From MessageId\" should be greater then 0 and less then \"To MessageId\"");            
         }
-        
-        try
-        {
-            _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
-        }
-        catch(AMQException amqex)
-        {
-            throw new JMException("Error moving messages to "  + toQueueName + ": " + amqex);
-        }
 
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
     }
-//
-//    public ObjectName getObjectName() throws MalformedObjectNameException
-//    {
-//        String objNameString = super.getObjectName().toString();
-//
-//        return new ObjectName(objNameString);
-//    }
 
 
     /**

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb  9 04:29:14 2007
@@ -75,7 +75,13 @@
      */
     private final AMQQueue _queue;
 
-
+    /**
+     * Flag used while moving messages from this queue to another. For moving messages the async delivery
+     * should also stop. This flat should be set to true to stop async delivery and set to false to enable
+     * async delivery again.
+     */
+    private AtomicBoolean _movingMessages = new AtomicBoolean();
+    
     /**
      * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
      * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
@@ -167,9 +173,12 @@
     }
 
 
-    public synchronized List<AMQMessage> getMessages()
+    public List<AMQMessage> getMessages()
     {
-        return new ArrayList<AMQMessage>(_messages);
+        _lock.lock();
+        ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+        _lock.unlock();
+        return list;
     }
 
     public void populatePreDeliveryQueue(Subscription subscription)
@@ -242,8 +251,52 @@
         }
     }
 
-    public synchronized void removeMessages(List<AMQMessage> messageList)
+    /**
+     * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
+     * so that the asyn delivery is also stopped.
+     */
+    public void startMovingMessages()
+    {
+        _lock.lock();
+        _movingMessages.set(true);
+    }
+
+    /**
+     * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
+     * so that the async delivery can start again.
+     */
+    public void stopMovingMessages()
+    {
+        _movingMessages.set(false);
+        if (_lock.isHeldByCurrentThread())
+        {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Messages will be removed from this queue and all preDeliveryQueues
+     * @param messageList
+     */
+    public void removeMovedMessages(List<AMQMessage> messageList)
     {
+        // Remove from the
+        boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+        if (hasSubscribers)
+        {
+            for (Subscription sub : _subscriptions.getSubscriptions())
+            {
+                if (!sub.isSuspended() && sub.hasFilters())
+                {
+                    Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
+                    for (AMQMessage msg : messageList)
+                    {
+                        preDeliveryQueue.remove(msg);
+                    }
+                }
+            }
+        }
+
         for (AMQMessage msg : messageList)
         {
             if (_messages.remove(msg))
@@ -253,29 +306,42 @@
         }
     }
 
-    public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+    /**
+     * Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+     * @param storeContext
+     * @throws AMQException
+     */
+    public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
-        AMQMessage msg = poll();
+        _lock.lock();
+        AMQMessage msg = getNextMessage();
         if (msg != null)
         {
-            msg.dequeue(storeContext, _queue);
-            _totalMessageSize.getAndAdd(-msg.getSize());
-        }        
+            // mark this message as taken and get it removed
+            msg.taken();
+            _queue.dequeue(storeContext, msg);
+            getNextMessage();
+        }
+        
+        _lock.unlock();
     }
 
-    public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
+    public long clearAllMessages(StoreContext storeContext) throws AMQException
     {
         long count = 0;
-        AMQMessage msg = poll();
+        _lock.lock();
+
+        AMQMessage msg = getNextMessage();
         while (msg != null)
         {
-            msg.dequeue(storeContext, _queue);
+            //mark this message as taken and get it removed
+            msg.taken();
+            _queue.dequeue(storeContext, msg);
+            msg = getNextMessage();
             count++;
-            _totalMessageSize.set(0L);
-            msg = poll();
-
         }
 
+        _lock.unlock();
         return count;
     }
 
@@ -298,6 +364,7 @@
         {
             //remove the already taken message
             messages.poll();
+            _totalMessageSize.addAndGet(-message.getSize());
             // try the next message
             message = messages.peek();
         }
@@ -335,6 +402,34 @@
     }
 
     /**
+     * enqueues the messages in the list on the queue and all required predelivery queues
+     * @param storeContext
+     * @param movedMessageList
+     */
+    public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+    {
+        _lock.lock();
+        for (AMQMessage msg : movedMessageList)
+        {
+            addMessageToQueue(msg);
+        }
+
+        // enqueue on the pre delivery queues
+        for (Subscription sub : _subscriptions.getSubscriptions())
+        {
+            for (AMQMessage msg : movedMessageList)
+            {
+                // Only give the message to those that want them.
+                if (sub.hasInterest(msg))
+                {
+                    sub.enqueueForPreDelivery(msg);
+                }
+            }
+        }
+        _lock.unlock();
+    }
+
+    /**
      * Only one thread should ever execute this method concurrently, but
      * it can do so while other threads invoke deliver().
      */
@@ -343,7 +438,7 @@
         // Continue to process delivery while we haveSubscribers and messages
         boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
 
-        while (hasSubscribers && hasQueuedMessages())
+        while (hasSubscribers && hasQueuedMessages() && !_movingMessages.get())
         {
             hasSubscribers = false;
 
@@ -378,11 +473,6 @@
         }
     }
 
-    private AMQMessage poll()
-    {
-        return _messages.poll();
-    }
-
     public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
     {
         if (_log.isDebugEnabled())
@@ -482,7 +572,7 @@
         public void run()
         {
             boolean running = true;
-            while (running)
+            while (running && !_movingMessages.get())
             {
                 processQueue();
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=505268&r1=505267&r2=505268
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb  9 04:29:14 2007
@@ -76,7 +76,13 @@
 
     long clearAllMessages(StoreContext storeContext) throws AMQException;
 
-    void removeMessages(List<AMQMessage> messageListToRemove);
+    void startMovingMessages();
+
+    void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+
+    void stopMovingMessages();
+
+    void removeMovedMessages(List<AMQMessage> messageListToRemove);
 
     List<AMQMessage> getMessages();