You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [8/19] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/
dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/
dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb 8 02:09:37 2008
@@ -55,7 +55,7 @@
defaultValue = "false")
public boolean compressBufferOnQueue;
/** Holds any queued messages */
- private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
/** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
@@ -107,8 +107,9 @@
}
- private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
+ private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
{
+ AMQMessage msg = entry.getMessage();
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
@@ -124,12 +125,12 @@
{
synchronized (_queueHeadLock)
{
- _messages.pushHead(msg);
+ _messages.pushHead(entry);
}
}
else
{
- _messages.offer(msg);
+ _messages.offer(entry);
}
_totalMessageSize.addAndGet(msg.getSize());
@@ -175,11 +176,11 @@
public long getOldestMessageArrival()
{
- AMQMessage msg = _messages.peek();
- return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ QueueEntry entry = _messages.peek();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
{
_lock.lock();
try
@@ -188,19 +189,19 @@
{
_log.debug("Queue has adding subscriber content");
_hasContent.add(subscription);
- _totalMessageSize.addAndGet(msg.getSize());
+ _totalMessageSize.addAndGet(entry.getSize());
_extraMessages.addAndGet(1);
}
else
{
_log.debug("Queue has removing subscriber content");
- if (msg == null)
+ if (entry == null)
{
_hasContent.remove(subscription);
}
else
{
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
_extraMessages.addAndGet(-1);
}
}
@@ -212,18 +213,48 @@
}
/**
+ * NOTE : This method should only be called when there are no active subscribers
+ */
+ public void removeExpired() throws AMQException
+ {
+ _lock.lock();
+
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ {
+ QueueEntry entry = iter.next();
+ if(entry.expired())
+ {
+ // fixme: Currently we have to update the total byte size here for the data in the queue
+ _totalMessageSize.addAndGet(-entry.getSize());
+ _queue.dequeue(_reapingStoreContext,entry);
+ iter.remove();
+ }
+ }
+
+
+ _lock.unlock();
+ }
+
+ /** @return the state of the async processor. */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
*/
- public List<AMQMessage> getMessages()
+ public List<QueueEntry> getMessages()
{
_lock.lock();
- List<AMQMessage> list = new ArrayList<AMQMessage>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- list.add(message);
+ list.add(entry);
}
_lock.unlock();
@@ -238,7 +269,7 @@
*
* @return
*/
- public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+ public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
{
if (fromMessageId <= 0 || toMessageId <= 0)
{
@@ -249,14 +280,14 @@
_lock.lock();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- long msgId = message.getMessageId();
+ long msgId = entry.getMessage().getMessageId();
if (msgId >= fromMessageId && msgId <= toMessageId)
{
- foundMessagesList.add(message);
+ foundMessagesList.add(entry);
}
// break if the no of messages are found
if (foundMessagesList.size() == maxMessageCount)
@@ -276,22 +307,26 @@
_log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
}
- Iterator<AMQMessage> currentQueue = _messages.iterator();
+ Iterator<QueueEntry> currentQueue = _messages.iterator();
while (currentQueue.hasNext())
{
- AMQMessage message = currentQueue.next();
- if (subscription.hasInterest(message))
+ QueueEntry entry = currentQueue.next();
+
+ if (!entry.getDeliveredToConsumer())
{
- subscription.enqueueForPreDelivery(message, false);
+ if (subscription.hasInterest(entry)) // TGM: should be and'd
+ {
+ subscription.enqueueForPreDelivery(entry, false);
+ }
}
}
}
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
- AMQMessage msg = getNextMessage();
- if (msg == null)
+ QueueEntry entry = getNextMessage();
+ if (entry == null)
{
return false;
}
@@ -313,9 +348,9 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- _queue.dequeue(channel.getStoreContext(), msg);
+ _queue.dequeue(channel.getStoreContext(), entry);
}
synchronized (channel)
{
@@ -323,17 +358,22 @@
if (acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
- protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+ protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
+ }
+
+ if (!acks)
+ {
+ entry.getMessage().decrementReference(channel.getStoreContext());
}
}
finally
{
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
return true;
@@ -367,7 +407,7 @@
*
* @param messageList
*/
- public void removeMovedMessages(List<AMQMessage> messageList)
+ public void removeMovedMessages(List<QueueEntry> messageList)
{
// Remove from the
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -377,20 +417,20 @@
{
if (!sub.isSuspended() && sub.filtersMessages())
{
- Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
- for (AMQMessage msg : messageList)
+ Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+ for (QueueEntry entry : messageList)
{
- preDeliveryQueue.remove(msg);
+ preDeliveryQueue.remove(entry);
}
}
}
}
- for (AMQMessage msg : messageList)
+ for (QueueEntry entry : messageList)
{
- if (_messages.remove(msg))
+ if (_messages.remove(entry))
{
- _totalMessageSize.getAndAdd(-msg.getSize());
+ _totalMessageSize.getAndAdd(-entry.getSize());
}
}
}
@@ -406,10 +446,16 @@
{
_lock.lock();
- AMQMessage message = _messages.poll();
- if (message != null)
+ QueueEntry entry = _messages.poll();
+ if (entry != null)
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _queue.dequeue(storeContext, entry);
+
+ _totalMessageSize.addAndGet(-entry.getSize());
+
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ entry.getMessage().decrementReference(storeContext);
+
}
_lock.unlock();
@@ -422,14 +468,16 @@
synchronized (_queueHeadLock)
{
- AMQMessage msg = getNextMessage();
- while (msg != null)
+ QueueEntry entry = getNextMessage();
+ while (entry != null)
{
//and remove it
_messages.poll();
- _queue.dequeue(storeContext, msg);
- msg = getNextMessage();
+ _queue.dequeue(storeContext, entry);
+ entry.getMessage().decrementReference(_reapingStoreContext);
+
+ entry = getNextMessage();
count++;
}
_totalMessageSize.set(0L);
@@ -445,34 +493,35 @@
*
* @throws org.apache.qpid.AMQException
*/
- private AMQMessage getNextMessage() throws AMQException
+ private QueueEntry getNextMessage() throws AMQException
{
- return getNextMessage(_messages, null);
+ return getNextMessage(_messages, null, false);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
+ private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
{
- AMQMessage message = messages.peek();
+ QueueEntry entry = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(message, sub))
+ while (purgeMessage(entry, sub, purgeOnly))
{
+ AMQMessage message = entry.getMessage();
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
// boolean alreadyTaken = message.taken(_queue, sub);
//remove the already taken message or expired
- AMQMessage removed = messages.poll();
+ QueueEntry removed = messages.poll();
- assert removed == message;
+ assert removed == entry;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue))
+ if (message.expired(_queue) && !entry.getDeliveredToConsumer())
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- message.dequeue(_reapingStoreContext, _queue);
+ _queue.dequeue(_reapingStoreContext, entry);
if (_log.isInfoEnabled())
{
@@ -489,10 +538,10 @@
}
// try the next message
- message = messages.peek();
+ entry = messages.peek();
}
- return message;
+ return entry;
}
/**
@@ -505,7 +554,25 @@
* @return
* @throws AMQException
*/
- private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
+ {
+ return purgeMessage(message, sub, false);
+ }
+
+ /**
+ * This method will return true if the message is to be purged from the queue.
+ * \
+ * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the current Queue(_queue) when purgeOnly is false
+ *
+ * @param message
+ * @param sub
+ * @param purgeOnly When set to false the message will be taken by the given Subscription.
+ *
+ * @return if the msg should be purged
+ *
+ * @throws AMQException
+ */
+ private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
{
//Original.. complicated while loop control
// (message != null
@@ -520,7 +587,7 @@
if (message != null)
{
// Check that the message hasn't expired.
- if (message.expired(_queue))
+ if (message.expired())
{
return true;
}
@@ -529,27 +596,36 @@
if (sub != null)
{
// if we have a queue browser(we don't purge) so check mark the message as taken
- purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ purge = ((!sub.isBrowser() || message.isTaken()));
}
else
{
// if there is no subscription we are doing
// a get or purging so mark message as taken.
- message.isTaken(_queue);
+ message.isTaken();
// and then ensure that it gets purged
purge = true;
}
}
- // if we are purging then ensure we mark this message taken for the current subscriber
- // the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.taken(_queue, sub);
+ if (purgeOnly)
+ {
+ // If we are simply purging the queue don't take the message
+ // just purge up to the next non-taken msg.
+ return purge && message.isTaken();
+ }
+ else
+ {
+ // if we are purging then ensure we mark this message taken for the current subscriber
+ // the current subscriber may be null in the case of a get or a purge but this is ok.
+ return purge && message.taken(sub);
+ }
}
- public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)
{
- Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+ Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
if (_log.isTraceEnabled())
{
@@ -568,16 +644,16 @@
return;
}
- AMQMessage message = null;
- AMQMessage removed = null;
+ QueueEntry entry = null;
+ QueueEntry removed = null;
try
{
synchronized (_queueHeadLock)
{
- message = getNextMessage(messageQueue, sub);
+ entry = getNextMessage(messageQueue, sub, false);
// message will be null if we have no messages in the messageQueue.
- if (message == null)
+ if (entry == null)
{
if (_log.isTraceEnabled())
{
@@ -587,12 +663,17 @@
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
- sub.send(message, _queue);
+ if (messageQueue == _messages)
+ {
+ _totalMessageSize.addAndGet(-entry.getSize());
+ }
+
+ sub.send(entry, _queue);
//remove sent message from our queue.
removed = messageQueue.poll();
@@ -600,14 +681,14 @@
// Otherwise the Async send will never end
}
- if (removed != message)
+ if (removed != entry)
{
- _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+ _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -639,16 +720,12 @@
}
}
- if ((message != null) && (messageQueue == _messages))
- {
- _totalMessageSize.addAndGet(-message.getSize());
- }
}
catch (AMQException e)
{
- if (message != null)
+ if (entry != null)
{
- message.release(_queue);
+ entry.release();
}
else
{
@@ -664,23 +741,23 @@
* @param storeContext
* @param movedMessageList
*/
- public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+ public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
- addMessageToQueue(msg, false);
+ addMessageToQueue(entry, false);
}
// enqueue on the pre delivery queues
for (Subscription sub : _subscriptions.getSubscriptions())
{
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
- sub.enqueueForPreDelivery(msg, true);
+ sub.enqueueForPreDelivery(entry, true);
}
}
}
@@ -732,30 +809,30 @@
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
{
final boolean debugEnabled = _log.isDebugEnabled();
if (debugEnabled)
{
- _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
+ _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
}
//Check if we have someone to deliver the message to.
_lock.lock();
try
{
- Subscription s = _subscriptions.nextSubscriber(msg);
+ Subscription s = _subscriptions.nextSubscriber(entry);
if (s == null || hasQueuedMessages()) //no-one can take the message right now or we're queueing
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+ _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
}
- if (!msg.getMessagePublishInfo().isImmediate())
+ if (!entry.getMessage().getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg, deliverFirst);
+ addMessageToQueue(entry, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -770,25 +847,25 @@
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
- if (_queue.isShared() && msg.getDeliveredToConsumer())
+ if (_queue.isShared() && entry.getDeliveredToConsumer())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
") is already delivered.");
}
continue;
}
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg, deliverFirst);
+ sub.enqueueForPreDelivery(entry, deliverFirst);
}
}
}
@@ -809,38 +886,67 @@
{
if (_log.isTraceEnabled())
{
- _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
+ _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(_queue, s);
+
+ if (entry.taken(s))
+ {
+ //Message has been delivered so don't redeliver.
+ // This can currently occur because of the recursive call below
+ // During unit tests the send can occur
+ // client then rejects
+ // this reject then releases the message by the time the
+ // if(!msg.isTaken()) call is made below
+ // the message has been released so that thread loops to send the message again
+ // of course by the time it gets back to here. the thread that released the
+ // message is now ready to send it. Here is a sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027), resendQueue=false]}
+ // Note: In the last request to take the message from thread 4,5 the message has been
+ // taken by the previous call done by thread 2,5
+
+
+ return;
+ }
//Deliver the message
- s.send(msg, _queue);
+ s.send(entry, _queue);
}
else
{
if (debugEnabled)
{
_log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+ "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
}
}
}
- if (!msg.isTaken(_queue))
+ if (!entry.isTaken())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
" Subscriber:" + System.identityHashCode(s));
}
- deliver(context, name, msg, deliverFirst);
+ deliver(context, name, entry, deliverFirst);
}
else
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.toString() +
+ _log.debug(debugIdentity() + " Message(" + entry.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb 8 02:09:37 2008
@@ -64,13 +64,13 @@
*
* @param storeContext
* @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
+ * @param entry the message to deliver
* @param deliverFirst
*
* @throws org.apache.qpid.server.queue.FailedDequeueException
* if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
@@ -78,15 +78,15 @@
void startMovingMessages();
- void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+ void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList);
void stopMovingMessages();
- void removeMovedMessages(List<AMQMessage> messageListToRemove);
+ void removeMovedMessages(List<QueueEntry> messageListToRemove);
- List<AMQMessage> getMessages();
+ List<QueueEntry> getMessages();
- List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+ List<QueueEntry> getMessages(long fromMessageId, long toMessageId);
void populatePreDeliveryQueue(Subscription subscription);
@@ -96,5 +96,7 @@
long getOldestMessageArrival();
- void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+ void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
+
+ void removeExpired() throws AMQException;
}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.log4j.Logger;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class QueueEntry
+{
+
+ /**
+ * Used for debugging purposes.
+ */
+ private static final Logger _log = Logger.getLogger(QueueEntry.class);
+
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ private Set<Subscription> _rejectedBy = null;
+
+ private AtomicReference<Object> _owner = new AtomicReference<Object>();
+
+
+ public QueueEntry(AMQQueue queue, AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getSize()
+ {
+ return getMessage().getSize();
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return getMessage().getDeliveredToConsumer();
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return getMessage().expired(_queue);
+ }
+
+ public boolean isTaken()
+ {
+ return _owner.get() != null;
+ }
+
+ public boolean taken(Subscription sub)
+ {
+ return !(_owner.compareAndSet(null, sub == null ? this : sub));
+ }
+
+ public void setDeliveredToConsumer()
+ {
+ getMessage().setDeliveredToConsumer();
+ }
+
+ public void release()
+ {
+ _owner.set(null);
+ }
+
+ public String debugIdentity()
+ {
+ return getMessage().debugIdentity();
+ }
+
+ public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+ {
+ _queue.process(storeContext, this, deliverFirst);
+ }
+
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ _message.checkDeliveredToConsumer();
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ getMessage().setRedelivered(b);
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ synchronized (this)
+ {
+ Object owner = _owner.get();
+ if (owner instanceof Subscription)
+ {
+ return (Subscription) owner;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ public void reject()
+ {
+ reject(getDeliveredSubscription());
+ }
+
+ public void reject(Subscription subscription)
+ {
+ if (subscription != null)
+ {
+ if (_rejectedBy == null)
+ {
+ _rejectedBy = new HashSet<Subscription>();
+ }
+
+ _rejectedBy.add(subscription);
+ }
+ else
+ {
+ _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ }
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ boolean rejected = _rejectedBy != null;
+
+ if (rejected) // We have subscriptions that rejected this message
+ {
+ return _rejectedBy.contains(subscription);
+ }
+ else // This messasge hasn't been rejected yet.
+ {
+ return rejected;
+ }
+ }
+
+
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Feb 8 02:09:37 2008
@@ -27,7 +27,7 @@
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+ void send(QueueEntry msg, AMQQueue queue) throws AMQException;
boolean isSuspended();
@@ -35,15 +35,15 @@
boolean filtersMessages();
- boolean hasInterest(AMQMessage msg);
+ boolean hasInterest(QueueEntry msg);
- Queue<AMQMessage> getPreDeliveryQueue();
+ Queue<QueueEntry> getPreDeliveryQueue();
- Queue<AMQMessage> getResendQueue();
+ Queue<QueueEntry> getResendQueue();
- Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+ Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages);
- void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
+ void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
boolean isAutoClose();
@@ -53,9 +53,9 @@
boolean isBrowser();
- boolean wouldSuspend(AMQMessage msg);
+ boolean wouldSuspend(QueueEntry msg);
- void addToResendQueue(AMQMessage msg);
+ void addToResendQueue(QueueEntry msg);
Object getSendLock();
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Feb 8 02:09:37 2008
@@ -58,9 +58,9 @@
private final Object _sessionKey;
- private MessageQueue<AMQMessage> _messages;
+ private MessageQueue<QueueEntry> _messages;
- private Queue<AMQMessage> _resendQueue;
+ private Queue<QueueEntry> _resendQueue;
private final boolean _noLocal;
@@ -160,7 +160,7 @@
if (filtersMessages())
{
- _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
}
else
{
@@ -226,7 +226,7 @@
*
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws AMQException
+ public void send(QueueEntry msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
@@ -245,7 +245,7 @@
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+ private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -266,11 +266,11 @@
_logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
}
}
- private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue)
throws AMQException
{
try
@@ -287,9 +287,9 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- queue.dequeue(storeContext, msg);
+ queue.dequeue(storeContext, entry);
}
synchronized (channel)
@@ -298,16 +298,20 @@
if (_sendLock.get())
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!");
}
if (_acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag);
}
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag);
+ if (!_acks)
+ {
+ entry.getMessage().decrementReference(storeContext);
+ }
}
}
finally
@@ -316,7 +320,7 @@
// using a try->finally would set it even if an error occured.
// Is this what we want?
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
}
@@ -351,19 +355,19 @@
return _filters != null || _noLocal;
}
- public boolean hasInterest(AMQMessage msg)
+ public boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (msg.isRejectedBy(this))
+ if (entry.isRejectedBy(this))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+ _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
}
// return false;
}
- final AMQProtocolSession publisher = msg.getPublisher();
+ final AMQProtocolSession publisher = entry.getMessage().getPublisher();
//todo - client id should be recoreded and this test removed but handled below
if (_noLocal && publisher != null)
@@ -414,9 +418,9 @@
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
+ _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
}
- return checkFilters(msg);
+ return checkFilters(entry);
}
@@ -427,7 +431,7 @@
return id;
}
- private boolean checkFilters(AMQMessage msg)
+ private boolean checkFilters(QueueEntry msg)
{
if (_filters != null)
{
@@ -435,7 +439,7 @@
// {
// _logger.trace("(" + debugIdentity() + ") has filters.");
// }
- return _filters.allAllow(msg);
+ return _filters.allAllow(msg.getMessage());
}
else
{
@@ -448,12 +452,12 @@
}
}
- public Queue<AMQMessage> getPreDeliveryQueue()
+ public Queue<QueueEntry> getPreDeliveryQueue()
{
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
+ public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
{
if (_messages != null)
{
@@ -557,19 +561,19 @@
while (!_resendQueue.isEmpty())
{
- AMQMessage resent = _resendQueue.poll();
+ QueueEntry resent = _resendQueue.poll();
if (_logger.isTraceEnabled())
{
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release(_queue);
+ resent.release();
_queue.subscriberHasPendingResend(false, this, resent);
try
{
- channel.getTransactionalContext().deliver(resent, _queue, true);
+ channel.getTransactionalContext().deliver(resent, true);
}
catch (AMQException e)
{
@@ -607,22 +611,22 @@
return _isBrowser;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(QueueEntry msg)
{
- return channel.wouldSuspend(msg);
+ return channel.wouldSuspend(msg.getMessage());
}
- public Queue<AMQMessage> getResendQueue()
+ public Queue<QueueEntry> getResendQueue()
{
if (_resendQueue == null)
{
- _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>();
}
return _resendQueue;
}
- public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
{
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -647,7 +651,7 @@
}
}
- public void addToResendQueue(AMQMessage msg)
+ public void addToResendQueue(QueueEntry msg)
{
// add to our resend queue
getResendQueue().add(msg);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Fri Feb 8 02:09:37 2008
@@ -30,5 +30,5 @@
{
public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
- public Subscription nextSubscriber(AMQMessage msg);
+ public Subscription nextSubscriber(QueueEntry entry);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Feb 8 02:09:37 2008
@@ -37,7 +37,8 @@
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
- private final Object _subscriptionsChange = new Object();
+
+ private final Object _changeLock = new Object();
/** Accessor for unit tests. */
@@ -48,7 +49,7 @@
public void addSubscriber(Subscription subscription)
{
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
_subscriptions.add(subscription);
}
@@ -66,7 +67,7 @@
// TODO: possibly need O(1) operation here.
Subscription sub = null;
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
int subIndex = _subscriptions.indexOf(subscription);
@@ -113,7 +114,7 @@
* concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
* nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
- public Subscription nextSubscriber(AMQMessage msg)
+ public Subscription nextSubscriber(QueueEntry msg)
{
if (_subscriptions.isEmpty())
{
@@ -140,7 +141,7 @@
}
}
- private Subscription nextSubscriberImpl(AMQMessage msg)
+ private Subscription nextSubscriberImpl(QueueEntry msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
while (iterator.hasNext())
@@ -226,4 +227,11 @@
{
return _subscriptions.size();
}
+
+
+ public Object getChangeLock()
+ {
+ return _changeLock;
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Feb 8 02:09:37 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.registry;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* An abstract application registry that provides access to configuration information and handles the
* construction and caching of configurable objects.
@@ -59,24 +58,7 @@
public void run()
{
_logger.info("Shutting down application registries...");
- try
- {
- synchronized (_instanceMap)
- {
- Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
-
- while (keyIterator.hasNext())
- {
- IApplicationRegistry instance = keyIterator.next();
-
- instance.close();
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Error shutting down message store: " + e, e);
- }
+ removeAll();
}
}
@@ -121,6 +103,7 @@
}
catch (Exception e)
{
+ _logger.error("Error shutting down message store: " + e, e);
}
finally
@@ -130,6 +113,14 @@
}
}
+ public static void removeAll()
+ {
+ Object[] keys = _instanceMap.keySet().toArray();
+ for (Object k : keys)
+ {
+ remove((Integer) k);
+ }
+ }
protected ApplicationRegistry(Configuration configuration)
{
@@ -173,7 +164,7 @@
public void close() throws Exception
{
- for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
{
virtualHost.close();
}
@@ -210,7 +201,6 @@
return instance;
}
-
public static void setDefaultApplicationRegistry(String clazz)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Feb 8 02:09:37 2008
@@ -34,6 +34,7 @@
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.ManagementConfiguration;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
@@ -60,6 +61,8 @@
private final Map<String, VirtualHost> _virtualHosts = new ConcurrentHashMap<String, VirtualHost>();
+ private PluginManager _pluginManager;
+
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
@@ -117,6 +120,8 @@
_managedObjectRegistry.start();
+ _pluginManager = new PluginManager(_configuration.getString("plugin-directory"));
+
initialiseVirtualHosts();
}
@@ -172,5 +177,10 @@
public Collection<String> getVirtualHostNames()
{
return getConfiguration().getList("virtualhosts.virtualhost.name");
+ }
+
+ public PluginManager getPluginManager()
+ {
+ return _pluginManager;
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Feb 8 02:09:37 2008
@@ -24,6 +24,7 @@
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.AccessManager;
@@ -68,4 +69,7 @@
VirtualHostRegistry getVirtualHostRegistry();
AccessManager getAccessManager();
+
+ PluginManager getPluginManager();
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb 8 02:09:37 2008
@@ -28,37 +28,11 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicGetBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
import org.apache.qpid.server.handler.BasicCancelMethodHandler;
import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
@@ -107,43 +81,35 @@
* Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
* AMQFrame.
*/
- private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
+/* private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
AMQState.class);
+ */
+
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, virtualHostRegistry, protocolSession);
- }
- protected AMQStateManager(AMQState initial, boolean register, VirtualHostRegistry virtualHostRegistry,
- AMQProtocolSession protocolSession)
- {
_virtualHostRegistry = virtualHostRegistry;
_protocolSession = protocolSession;
- _currentState = initial;
- if (register)
- {
- registerListeners();
- }
+ _currentState = AMQState.CONNECTION_NOT_STARTED;
+
}
+ /*
protected void registerListeners()
{
Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
- frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
- frame2handlerMap.put(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_NOT_AUTH, frame2handlerMap);
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
- frame2handlerMap.put(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
@@ -154,37 +120,41 @@
// ConnectionOpen handlers
//
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
- frame2handlerMap.put(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
- frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
- frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
- frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
- frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());
- frame2handlerMap.put(BasicGetBody.class, BasicGetMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelBody.class, BasicCancelMethodHandler.getInstance());
- frame2handlerMap.put(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
- frame2handlerMap.put(BasicQosBody.class, BasicQosHandler.getInstance());
- frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
- frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
- frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
- frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
- frame2handlerMap.put(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
- frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
- frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
- frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance());
- frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance());
+ ChannelOpenHandler.getInstance();
+ ChannelCloseHandler.getInstance();
+ ChannelCloseOkHandler.getInstance();
+ ConnectionCloseMethodHandler.getInstance();
+ ConnectionCloseOkMethodHandler.getInstance();
+ ConnectionTuneOkMethodHandler.getInstance();
+ ConnectionSecureOkMethodHandler.getInstance();
+ ConnectionStartOkMethodHandler.getInstance();
+ ExchangeDeclareHandler.getInstance();
+ ExchangeDeleteHandler.getInstance();
+ ExchangeBoundHandler.getInstance();
+ BasicAckMethodHandler.getInstance();
+ BasicRecoverMethodHandler.getInstance();
+ BasicConsumeMethodHandler.getInstance();
+ BasicGetMethodHandler.getInstance();
+ BasicCancelMethodHandler.getInstance();
+ BasicPublishMethodHandler.getInstance();
+ BasicQosHandler.getInstance();
+ QueueBindHandler.getInstance();
+ QueueDeclareHandler.getInstance();
+ QueueDeleteHandler.getInstance();
+ QueuePurgeHandler.getInstance();
+ ChannelFlowHandler.getInstance();
+ TxSelectHandler.getInstance();
+ TxCommitHandler.getInstance();
+ TxRollbackHandler.getInstance();
+ BasicRejectMethodHandler.getInstance();
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
- frame2handlerMap.put(ConnectionCloseOkBody.class, ConnectionCloseOkMethodHandler.getInstance());
+
_state2HandlersMap.put(AMQState.CONNECTION_CLOSING, frame2handlerMap);
- }
+ } */
public AMQState getCurrentState()
{
@@ -214,18 +184,25 @@
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
- StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
- if (handler != null)
- {
+ MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
- checkChannel(evt, _protocolSession);
+ final int channelId = evt.getChannelId();
+ B body = evt.getMethod();
- handler.methodReceived(this, evt);
+ if(channelId != 0 && _protocolSession.getChannel(channelId)== null)
+ {
+
+ if(! ((body instanceof ChannelOpenBody)
+ || (body instanceof ChannelCloseOkBody)
+ || (body instanceof ChannelCloseBody)))
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
+ }
- return true;
}
- return false;
+ return body.execute(dispatcher, channelId);
+
}
private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
@@ -239,6 +216,7 @@
}
}
+/*
protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
B frame)
// throws IllegalStateTransitionException
@@ -260,6 +238,7 @@
return handler;
}
}
+*/
public void addStateListener(StateListener listener)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java Fri Feb 8 02:09:37 2008
@@ -29,7 +29,7 @@
* the opportunity to update state.
*
*/
-public interface StateAwareMethodListener <B extends AMQMethodBody>
+public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, AMQMethodEvent<B> evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, B evt, int channelId) throws AMQException;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Feb 8 02:09:37 2008
@@ -20,27 +20,26 @@
*/
package org.apache.qpid.server.store;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@
protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
public void configure()
{
@@ -77,6 +77,7 @@
public void close() throws Exception
{
+ _closed.getAndSet(true);
if (_metaDataMap != null)
{
_metaDataMap.clear();
@@ -89,8 +90,9 @@
}
}
- public void removeMessage(StoreContext context, Long messageId)
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
if (_log.isDebugEnabled())
{
_log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- if(bodyList == null && lastContentBody)
+ if (bodyList == null && lastContentBody)
{
_contentBodyMap.put(messageId, Collections.singletonList(contentBody));
}
@@ -193,17 +196,28 @@
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
throws AMQException
{
+ checkNotClosed();
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
return _metaDataMap.get(messageId);
}
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
+ }
+
+ private void checkNotClosed() throws MessageStoreClosedException
+ {
+ if (_closed.get())
+ {
+ throw new MessageStoreClosedException();
+ }
}
}
Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java Fri Feb 8 02:09:37 2008
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+ public MessageStoreClosedException()
+ {
+ super(null, "Message store closed", null);
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Fri Feb 8 02:09:37 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,9 +23,12 @@
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.configuration.Configured;
+import org.apache.log4j.Logger;
public class ConnectorConfiguration
{
+ private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class);
+
public static final String DEFAULT_PORT = "5672";
public static final String SSL_PORT = "8672";
@@ -41,7 +44,7 @@
@Configured(path = "connector.bind",
defaultValue = "wildcard")
public String bindAddress;
-
+
@Configured(path = "connector.socketReceiveBuffer",
defaultValue = "32767")
public int socketReceiveBufferSize;
@@ -69,29 +72,43 @@
@Configured(path = "connector.ssl.enabled",
defaultValue = "false")
public boolean enableSSL;
-
+
@Configured(path = "connector.ssl.sslOnly",
- defaultValue = "true")
+ defaultValue = "true")
public boolean sslOnly;
-
+
@Configured(path = "connector.ssl.port",
- defaultValue = SSL_PORT)
- public int sslPort;
-
+ defaultValue = SSL_PORT)
+ public int sslPort;
+
@Configured(path = "connector.ssl.keystorePath",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePath;
-
+
@Configured(path = "connector.ssl.keystorePassword",
- defaultValue = "none")
+ defaultValue = "none")
public String keystorePassword;
-
+
@Configured(path = "connector.ssl.certType",
- defaultValue = "SunX509")
+ defaultValue = "SunX509")
public String certType;
+ @Configured(path = "connector.qpidnio",
+ defaultValue = "true")
+ public boolean _multiThreadNIO;
+
+
public IoAcceptor createAcceptor()
{
- return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ if (_multiThreadNIO)
+ {
+ _logger.warn("Using Qpid Multithreaded IO Processing");
+ return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor());
+ }
+ else
+ {
+ _logger.warn("Using Mina IO Processing");
+ return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor());
+ }
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java Fri Feb 8 02:09:37 2008
@@ -20,6 +20,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -138,16 +139,14 @@
// The message is now fully received, we can stage it before enqueued if necessary
}
- public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst)
+ public void deliver(QueueEntry entry, boolean deliverFirst)
throws
AMQException
{
try
{
- //The message has been delivered to the queues
- message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
// add a record in the transaction
- _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, message, queue, deliverFirst));
+ _transactionManager.getTransaction((Xid) _storeContext.getPayload()).addRecord(new EnqueueRecord(_storeContext, entry, deliverFirst));
} catch (Exception e)
{
throw new AMQException(null, "Problem during transaction rollback", e);
@@ -177,7 +176,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + message.message.getMessageId());
+ _log.debug("Discarding message: " + message.entry.debugIdentity());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -203,7 +202,7 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.entry.debugIdentity());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
dequeue(msg);
@@ -222,7 +221,7 @@
if (_log.isDebugEnabled())
{
- _log.debug("Discarding message: " + msg.message.getMessageId());
+ _log.debug("Discarding message: " + msg.entry.debugIdentity());
}
//Message has been ack so discard it. This will dequeue and decrement the reference.
@@ -231,7 +230,7 @@
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.message.getMessageId());
+ msg.entry.debugIdentity());
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java Fri Feb 8 02:09:37 2008
@@ -24,6 +24,7 @@
import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
/**
@@ -34,15 +35,13 @@
public class EnqueueRecord implements TransactionRecord
{
private final StoreContext _storeContext;
- private final AMQMessage _msg;
- private final AMQQueue _queue;
+ private final QueueEntry _entry;
private final boolean _first;
- EnqueueRecord(StoreContext storeContext, AMQMessage msg, AMQQueue q, boolean firsr)
+ EnqueueRecord(StoreContext storeContext, QueueEntry entry, boolean firsr)
{
_storeContext = storeContext;
- _msg = msg;
- _queue = q;
+ _entry = entry;
_first = firsr;
}
@@ -52,7 +51,7 @@
{
try
{
- _queue.process(_storeContext, _msg, _first);
+ _entry.process(_storeContext, _first);
}
catch (AMQException e)
{