You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/12/19 00:00:43 UTC
svn commit: r605352 [2/2] - in /incubator/qpid/branches/M2.1/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/...
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 18 15:00:40 2007
@@ -58,9 +58,9 @@
private final Object _sessionKey;
- private MessageQueue<AMQMessage> _messages;
+ private MessageQueue<QueueEntry> _messages;
- private Queue<AMQMessage> _resendQueue;
+ private Queue<QueueEntry> _resendQueue;
private final boolean _noLocal;
@@ -160,7 +160,7 @@
if (filtersMessages())
{
- _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
}
else
{
@@ -226,7 +226,7 @@
*
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+ public void send(QueueEntry msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
@@ -245,7 +245,7 @@
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+ private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -266,11 +266,11 @@
_logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
}
}
- private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue)
throws AMQException
{
try
@@ -287,9 +287,9 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- queue.dequeue(storeContext, msg);
+ queue.dequeue(storeContext, entry);
}
synchronized (channel)
@@ -298,19 +298,19 @@
if (_sendLock.get())
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
}
if (_acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
if (!_acks)
{
- msg.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
}
}
@@ -320,7 +320,7 @@
// using a try->finally would set it even if an error occured.
// Is this what we want?
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
}
@@ -355,19 +355,19 @@
return _filters != null || _noLocal;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (msg.isRejectedBy(this))
+ if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+ _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
}
// return false;
}
- final AMQProtocolSession publisher = msg.getPublisher();
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
//todo - client id should be recoreded and this test removed but handled below
if (_noLocal && publisher != null)
@@ -418,9 +418,9 @@
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
+ _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
}
- return checkFilters(msg);
+ return checkFilters(entry);
}
@@ -431,7 +431,7 @@
return id;
}
- private boolean checkFilters(AMQMessage msg)
+ private boolean checkFilters(QueueEntry msg)
{
if (_filters != null)
{
@@ -439,7 +439,7 @@
// {
// _logger.trace("(" + debugIdentity() + ") has filters.");
// }
- return _filters.allAllow(msg);
+ return _filters.allAllow(msg.getMessage());
}
else
{
@@ -452,12 +452,12 @@
}
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
if (_messages != null)
{
@@ -561,19 +561,19 @@
while (!_resendQueue.isEmpty())
{
- AMQMessage resent = _resendQueue.poll();
+ QueueEntry resent = _resendQueue.poll();
if (_logger.isTraceEnabled())
{
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release(_queue);
+ resent.release();
_queue.subscriberHasPendingResend(false, this, resent);
try
{
- channel.getTransactionalContext().deliver(resent, _queue, true);
+ channel.getTransactionalContext().deliver(resent, true);
}
catch (AMQException e)
{
@@ -611,22 +611,22 @@
return _isBrowser;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg);
+ return channel.wouldSuspend(msg.getMessage());
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
if (_resendQueue == null)
{
- _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>();
}
return _resendQueue;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -651,7 +651,7 @@
}
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
// add to our resend queue
getResendQueue().add(msg);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 18 15:00:40 2007
@@ -30,5 +30,5 @@
{
public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
- public Subscription nextSubscriber(AMQMessage msg);
+ public Subscription nextSubscriber(QueueEntry entry);
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 18 15:00:40 2007
@@ -113,7 +113,7 @@
* concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
* nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
- public Subscription nextSubscriber(AMQMessage msg)
+ public Subscription nextSubscriber(QueueEntry msg)
{
if (_subscriptions.isEmpty())
{
@@ -140,7 +140,7 @@
}
}
- private Subscription nextSubscriberImpl(AMQMessage msg)
+ private Subscription nextSubscriberImpl(QueueEntry msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
while (iterator.hasNext())
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -30,6 +30,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -64,14 +65,13 @@
private static class DeliveryDetails
{
- public AMQMessage message;
- public AMQQueue queue;
+ public QueueEntry entry;
+
private boolean deliverFirst;
- public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+ public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
{
- this.message = message;
- this.queue = queue;
+ this.entry = entry;
this.deliverFirst = deliverFirst;
}
}
@@ -103,7 +103,7 @@
_postCommitDeliveryList.clear();
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
@@ -112,9 +112,9 @@
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
// message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
+ _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
_messageDelivered = true;
- _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
{
@@ -242,11 +242,11 @@
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
- dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+ dd.entry.process(_storeContext, dd.deliverFirst);
try
{
- dd.message.checkDeliveredToConsumer();
+ dd.entry.checkDeliveredToConsumer();
}
catch (NoConsumersException nce)
{
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -34,6 +34,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -92,14 +93,14 @@
// Does not apply to this context
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
+ public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
{
try
{
- queue.process(_storeContext, message, deliverFirst);
+ entry.process(_storeContext, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
- message.checkDeliveredToConsumer();
+ entry.checkDeliveredToConsumer();
}
catch (NoConsumersException e)
{
@@ -128,7 +129,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + message.message.getMessageId());
+ _log.debug("Discarding message: " + message.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -162,7 +163,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -192,7 +193,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageId());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -206,7 +207,7 @@
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.message.getMessageId());
+ msg.getMessage().getMessageId());
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Tue Dec 18 15:00:40 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -111,14 +112,13 @@
*
* <p/>This is an 'enqueue' operation.
*
- * @param message The message to deliver.
- * @param queue The queue to deliver the message to.
+ * @param entry The message to deliver, and the queue to deliver to.
* @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
* for normal FIFO message ordering.
*
* @throws AMQException If the message cannot be delivered for any reason.
*/
- void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
+ void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
/**
* Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Tue Dec 18 15:00:40 2007
@@ -24,6 +24,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -85,7 +86,7 @@
}
- protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -96,8 +97,9 @@
display.add(hex);
display.add(ascii);
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
@@ -252,8 +254,8 @@
private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
String title, boolean routing, boolean headers, boolean messageHeaders)
{
- List<AMQMessage> single = new LinkedList<AMQMessage>();
- single.add(msg);
+ List<QueueEntry> single = new LinkedList<QueueEntry>();
+ single.add(new QueueEntry(null,msg));
List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Tue Dec 18 15:00:40 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
@@ -166,12 +167,12 @@
if (fromQueue != null)
{
- List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue();
if (messages != null)
{
- for (AMQMessage msg : messages)
+ for (QueueEntry msg : messages)
{
- ids.add(msg.getMessageId());
+ ids.add(msg.getMessage().getMessageId());
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Tue Dec 18 15:00:40 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -114,7 +115,7 @@
if (_queue != null)
{
- List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> messages = _queue.getMessagesOnTheQueue();
if (messages == null || messages.size() == 0)
{
_console.println("No messages on queue");
@@ -153,7 +154,7 @@
* @param showMessageHeaders show the msg headers be shown
* @return the formated data lists for printing
*/
- protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting,
boolean showMessageHeaders)
{
@@ -334,8 +335,9 @@
}
//Add create the table of data
- for (AMQMessage msg : messages)
+ for (QueueEntry entry : messages)
{
+ AMQMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Tue Dec 18 15:00:40 2007
@@ -104,7 +104,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -146,7 +146,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -166,7 +166,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -207,7 +207,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -227,7 +227,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -247,7 +247,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -266,7 +266,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -308,7 +308,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -327,7 +327,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -369,7 +369,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -403,7 +403,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -446,7 +446,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -487,7 +487,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0));
+ Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Dec 18 15:00:40 2007
@@ -286,7 +286,7 @@
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false);
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Dec 18 15:00:40 2007
@@ -211,7 +211,7 @@
msg.enqueue(_queue);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- _queue.process(_storeContext, msg, false);
+ _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
{
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Dec 18 15:00:40 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -133,7 +134,7 @@
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
}
_acked = acked;
_unacked = unacked;
@@ -150,7 +151,7 @@
{
UnacknowledgedMessage u = _map.get(tag);
assertTrue("Message not found for tag " + tag, u != null);
- ((TestMessage) u.message).assertCountEquals(expected);
+ ((TestMessage) u.getMessage()).assertCountEquals(expected);
}
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Dec 18 15:00:40 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -250,9 +251,9 @@
* @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException
{
- messages.add(new HeadersExchangeTest.Message(msg));
+ messages.add(new HeadersExchangeTest.Message(msg.getMessage()));
}
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Dec 18 15:00:40 2007
@@ -142,7 +142,7 @@
msg.incrementReference();
msg.routingComplete(_messageStore, _storeContext, factory);
// we manually send the message to the subscription
- _subscription.send(msg, _queue);
+ _subscription.send(new QueueEntry(_queue,msg), _queue);
}
}
@@ -167,7 +167,7 @@
assertTrue(deliveryTag == i);
i++;
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
}
assertTrue(map.size() == msgCount);
@@ -228,7 +228,7 @@
{
assertTrue(deliveryTag == i);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -257,7 +257,7 @@
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
@@ -281,7 +281,7 @@
{
assertTrue(deliveryTag == i + 5);
UnacknowledgedMessage unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.queue == _queue);
+ assertTrue(unackedMsg.getQueue() == _queue);
++i;
}
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java Tue Dec 18 15:00:40 2007
@@ -42,9 +42,9 @@
private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();
- private final List<AMQMessage> _messages = new ArrayList<AMQMessage>();
+ private final List<QueueEntry> _messages = new ArrayList<QueueEntry>();
private int next = 0;//index to next message to send
- private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>());
+ private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>());
private final Executor _executor = new OnCurrentThreadExecutor();
private final List<Thread> _threads = new ArrayList<Thread>();
@@ -159,7 +159,7 @@
}
}
- private AMQMessage nextMessage()
+ private QueueEntry nextMessage()
{
synchronized (_messages)
{
@@ -191,7 +191,7 @@
{
void doRun() throws Throwable
{
- AMQMessage msg = nextMessage();
+ QueueEntry msg = nextMessage();
if (msg != null)
{
_deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Dec 18 15:00:40 2007
@@ -40,7 +40,7 @@
public void testStartInQueueingMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -85,7 +85,7 @@
public void testStartInDirectMode() throws AMQException
{
- AMQMessage[] messages = new AMQMessage[10];
+ QueueEntry[] messages = new QueueEntry[10];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message();
@@ -132,7 +132,7 @@
{
try
{
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
@@ -154,7 +154,7 @@
SubscriptionTestHelper s = new SubscriptionTestHelper("A");
_subscriptions.addSubscriber(s);
s.setSuspended(true);
- AMQMessage msg = message(true);
+ QueueEntry msg = message(true);
_mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Dec 18 15:00:40 2007
@@ -55,12 +55,12 @@
ApplicationRegistry.initialise(new NullApplicationRegistry());
}
- AMQMessage message() throws AMQException
+ QueueEntry message() throws AMQException
{
return message(false);
}
- AMQMessage message(final boolean immediate) throws AMQException
+ QueueEntry message(final boolean immediate) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -86,8 +86,8 @@
}
};
- return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
+ return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
+ new ContentHeaderBody()));
}
}
Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Dec 18 15:00:40 2007
@@ -28,13 +28,13 @@
public class SubscriptionTestHelper implements Subscription
{
- private final List<AMQMessage> messages;
+ private final List<QueueEntry> messages;
private final Object key;
private boolean isSuspended;
public SubscriptionTestHelper(Object key)
{
- this(key, new ArrayList<AMQMessage>());
+ this(key, new ArrayList<QueueEntry>());
}
public SubscriptionTestHelper(final Object key, final boolean isSuspended)
@@ -43,18 +43,18 @@
setSuspended(isSuspended);
}
- SubscriptionTestHelper(Object key, List<AMQMessage> messages)
+ SubscriptionTestHelper(Object key, List<QueueEntry> messages)
{
this.key = key;
this.messages = messages;
}
- List<AMQMessage> getMessages()
+ List<QueueEntry> getMessages()
{
return messages;
}
- public void send(AMQMessage msg, AMQQueue queue)
+ public void send(QueueEntry msg, AMQQueue queue)
{
messages.add(msg);
}
@@ -69,12 +69,12 @@
return isSuspended;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
return isSuspended;
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
//no-op
}
@@ -98,27 +98,27 @@
return false;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry msg)
{
return true;
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return null;
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
return null;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
return messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
//no-op
}