You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/15 16:44:02 UTC
svn commit: r518669 - in
/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:
AMQQueue.java ConcurrentSelectorDeliveryManager.java DeliveryManager.java
Author: bhupendrab
Date: Thu Mar 15 08:44:01 2007
New Revision: 518669
URL: http://svn.apache.org/viewvc?view=rev&rev=518669
Log:
- DeliveryManager.getMessage is reimplmented because the ConcurrentLinkedMessageQueueAtomicSize.toArray is not implemented
- Not creating lock while doing startMovingMessages.Just setting movingMessage to true, because that can stop the sync delivery.
- And some tidy up of the code.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar 15 08:44:01 2007
@@ -235,37 +235,40 @@
return _deliveryMgr.getMessages();
}
+ /**
+ * Returns messages within the given range of message Ids
+ * @param fromMessageId
+ * @param toMessageId
+ * @return List of messages
+ */
+ public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+ {
+ return _deliveryMgr.getMessages(fromMessageId, toMessageId);
+ }
+
public long getQueueDepth()
{
return _deliveryMgr.getTotalMessageSize();
}
-
/**
* @param messageId
- *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
{
- List<AMQMessage> list = getMessagesOnTheQueue();
- AMQMessage msg = null;
- for (AMQMessage message : list)
+ List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+ if (list == null || list.size() == 0)
{
- if (message.getMessageId() == messageId)
- {
- msg = message;
- break;
- }
+ return null;
}
-
- return msg;
+ return list.get(0);
}
/**
* moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
- * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+ * moving messages (stop the async delivery) - get all the messages available in the given message
+ * id range - setup the other queue for moving messages (stop the async delivery) - send these
* available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
* remove messages from this queue - remove locks from both queues and start async delivery
*
@@ -282,24 +285,7 @@
try
{
startMovingMessages();
- List<AMQMessage> list = getMessagesOnTheQueue();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
-
- // Run this loop till you find all the messages or the list has no more messages
- for (AMQMessage message : list)
- {
- long msgId = message.getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
- {
- foundMessagesList.add(message);
- }
- // break the loop as soon as messages to be removed are found
- if (foundMessagesList.size() == maxMessageCountToBeMoved)
- {
- break;
- }
- }
+ List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
// move messages to another queue
anotherQueue.startMovingMessages();
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Mar 15 08:44:01 2007
@@ -208,15 +208,61 @@
}
}
-
+ /**
+ * Returns all the messages in the Queue
+ * @return List of messages
+ */
public List<AMQMessage> getMessages()
{
_lock.lock();
- ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+ List<AMQMessage> list = new ArrayList<AMQMessage>();
+
+ for (AMQMessage message : _messages)
+ {
+ list.add(message);
+ }
_lock.unlock();
+
return list;
}
+ /**
+ * Returns messages within the range of given messageIds
+ * @param fromMessageId
+ * @param toMessageId
+ * @return
+ */
+ public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+ {
+ if (fromMessageId <= 0 || toMessageId <= 0)
+ {
+ return null;
+ }
+
+ long maxMessageCount = toMessageId - fromMessageId + 1;
+
+ _lock.lock();
+
+ List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+
+ for (AMQMessage message : _messages)
+ {
+ long msgId = message.getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(message);
+ }
+ // break if the no of messages are found
+ if (foundMessagesList.size() == maxMessageCount)
+ {
+ break;
+ }
+ }
+ _lock.unlock();
+
+ return foundMessagesList;
+ }
+
public void populatePreDeliveryQueue(Subscription subscription)
{
if (_log.isTraceEnabled())
@@ -294,7 +340,6 @@
*/
public void startMovingMessages()
{
- _lock.lock();
_movingMessages.set(true);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Thu Mar 15 08:44:01 2007
@@ -86,6 +86,8 @@
List<AMQMessage> getMessages();
+ List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+
void populatePreDeliveryQueue(Subscription subscription);
boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;