You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/12/19 00:00:43 UTC
svn commit: r605352 [1/2] - in /incubator/qpid/branches/M2.1/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/...
Author: rgodfrey
Date: Tue Dec 18 15:00:40 2007
New Revision: 605352
URL: http://svn.apache.org/viewvc?rev=605352&view=rev
Log:
QPID-711 : create a QueueEntry class and move message-on-queue functions (such as taken()) to this class
Added:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Dec 18 15:00:40 2007
@@ -36,10 +36,7 @@
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.Subscription;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -421,33 +418,32 @@
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message the message that was delivered
+ * @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
* @param consumerTag The tag for the consumer that is to acknowledge this message.
- * @param queue the queue from which the message was delivered
*/
- public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
+ public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, AMQShortString consumerTag)
{
if (_log.isDebugEnabled())
{
- if (queue == null)
+ if (entry.getQueue() == null)
{
- _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+ _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
}
else
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag
- + ") with a queue(" + queue + ") for " + consumerTag);
+ _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+ + ") with a queue(" + entry.getQueue() + ") for " + consumerTag);
}
}
}
synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag));
checkSuspension();
}
}
@@ -499,16 +495,16 @@
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
// Ensure message is released for redelivery
- unacked.message.release(unacked.queue);
+ unacked.entry.release();
// Mark message redelivered
- unacked.message.setRedelivered(true);
+ unacked.getMessage().setRedelivered(true);
// Deliver Message
- deliveryContext.deliver(unacked.message, unacked.queue, false);
+ deliveryContext.deliver(unacked.entry, false);
// Should we allow access To the DM to directy deliver the message?
// As we don't need to check for Consumers or worry about incrementing the message count?
@@ -533,13 +529,13 @@
{
// Ensure message is released for redelivery
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
- unacked.message.release(unacked.queue);
+ unacked.entry.release();
}
// Mark message redelivered
- unacked.message.setRedelivered(true);
+ unacked.getMessage().setRedelivered(true);
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
@@ -559,16 +555,16 @@
deliveryContext = _txnContext;
}
- if (unacked.queue != null)
+ if (!unacked.isQueueDeleted())
{
// Redeliver the messages to the front of the queue
- deliveryContext.deliver(unacked.message, unacked.queue, true);
+ deliveryContext.deliver(unacked.entry, true);
// Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
@@ -591,7 +587,7 @@
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
_log.debug(
- (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]");
+ (count++) + ": (" + message.getMessage().debugIdentity() + ")" + "[" + message.deliveryTag + "]");
return false; // Continue
}
@@ -630,7 +626,7 @@
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
AMQShortString consumerTag = message.consumerTag;
- AMQMessage msg = message.message;
+ AMQMessage msg = message.getMessage();
msg.setRedelivered(true);
if (consumerTag != null)
{
@@ -649,7 +645,7 @@
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (message.queue != null)
+ if (!message.isQueueDeleted())
{
if (requeue)
{
@@ -690,7 +686,7 @@
for (UnacknowledgedMessage message : msgToResend)
{
- AMQMessage msg = message.message;
+ AMQMessage msg = message.getMessage();
// Our Java Client will always suspend the channel when resending!
// If the client has requested the messages be resent then it is
@@ -705,13 +701,13 @@
// else
// {
// release to allow it to be delivered
- msg.release(message.queue);
+ message.entry.release();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription(message.queue);
+ Subscription sub = message.entry.getDeliveredSubscription();
if (sub != null)
{
@@ -741,7 +737,7 @@
+ System.identityHashCode(sub));
}
- sub.addToResendQueue(msg);
+ sub.addToResendQueue(message.entry);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
} // sync(sub.getSendLock)
@@ -789,10 +785,10 @@
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- message.message.release(message.queue);
- message.message.setRedelivered(true);
+ message.entry.release();
+ message.entry.setRedelivered(true);
- deliveryContext.deliver(message.message, message.queue, true);
+ deliveryContext.deliver(message.entry, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
@@ -813,17 +809,18 @@
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- if (message.queue == queue)
+ if (message.getQueue() == queue)
{
try
{
message.discard(_storeContext);
- message.queue = null;
+ message.setQueueDeleted(true);
+
}
catch (AMQException e)
{
_log.error(
- "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e);
+ "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Tue Dec 18 15:00:40 2007
@@ -87,7 +87,7 @@
//buffer must be marked as persistent:
for (UnacknowledgedMessage msg : _unacked)
{
- if (msg.message.isPersistent())
+ if (msg.getMessage().isPersistent())
{
return true;
}
@@ -100,7 +100,7 @@
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
- msg.restoreTransientMessageData();
+ //msg.restoreTransientMessageData();
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
@@ -116,7 +116,7 @@
for (UnacknowledgedMessage msg : _unacked)
{
msg.clearTransientMessageData();
- msg.message.takeReference();
+ msg.getMessage().takeReference();
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Tue Dec 18 15:00:40 2007
@@ -24,19 +24,21 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoreContext;
public class UnacknowledgedMessage
{
- public final AMQMessage message;
+ public final QueueEntry entry;
public final AMQShortString consumerTag;
public final long deliveryTag;
- public AMQQueue queue;
- public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, AMQShortString consumerTag, long deliveryTag)
+ private boolean _queueDeleted;
+
+
+ public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag)
{
- this.queue = queue;
- this.message = message;
+ this.entry = entry;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
}
@@ -45,9 +47,9 @@
{
StringBuilder sb = new StringBuilder();
sb.append("Q:");
- sb.append(queue);
+ sb.append(entry.getQueue());
sb.append(" M:");
- sb.append(message);
+ sb.append(entry.getMessage());
sb.append(" CT:");
sb.append(consumerTag);
sb.append(" DT:");
@@ -58,22 +60,42 @@
public void discard(StoreContext storeContext) throws AMQException
{
- if (queue != null)
+ if (entry.getQueue() != null)
{
- queue.dequeue(storeContext, message);
+ entry.getQueue().dequeue(storeContext, entry);
}
//if the queue is null then the message is waiting to be acked, but has been removed.
- message.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
public void restoreTransientMessageData() throws AMQException
{
- message.restoreTransientMessageData();
+ entry.getMessage().restoreTransientMessageData();
}
public void clearTransientMessageData()
{
- message.clearTransientMessageData();
+ entry.getMessage().clearTransientMessageData();
+ }
+
+ public AMQMessage getMessage()
+ {
+ return entry.getMessage();
+ }
+
+ public AMQQueue getQueue()
+ {
+ return entry.getQueue();
+ }
+
+ public void setQueueDeleted(boolean queueDeleted)
+ {
+ _queueDeleted = queueDeleted;
+ }
+
+ public boolean isQueueDeleted()
+ {
+ return _queueDeleted;
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue Dec 18 15:00:40 2007
@@ -97,7 +97,7 @@
UnacknowledgedMessage message = _map.remove(deliveryTag);
if(message != null)
{
- _unackedSize -= message.message.getSize();
+ _unackedSize -= message.getMessage().getSize();
}
return message;
@@ -127,7 +127,7 @@
synchronized (_lock)
{
_map.put(deliveryTag, message);
- _unackedSize += message.message.getSize();
+ _unackedSize += message.getMessage().getSize();
_lastDeliveryTag = deliveryTag;
}
}
@@ -186,7 +186,7 @@
}
it.remove();
- _unackedSize -= unacked.getValue().message.getSize();
+ _unackedSize -= unacked.getValue().getMessage().getSize();
destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Dec 18 15:00:40 2007
@@ -143,7 +143,7 @@
public AMQShortString getDefaultExchangeName()
{
- return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
}
};
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Dec 18 15:00:40 2007
@@ -85,7 +85,7 @@
}
else
{
- if (message.queue == null || message.queue.isDeleted())
+ if (message.isQueueDeleted() || message.getQueue().isDeleted())
{
_logger.warn("Message's Queue as already been purged, unable to Reject. " +
"Dropping message should use Dead Letter Queue");
@@ -93,7 +93,7 @@
return;
}
- if (!message.message.isReferenced())
+ if (!message.getMessage().isReferenced())
{
_logger.warn("Message as already been purged, unable to Reject.");
return;
@@ -102,7 +102,7 @@
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+ _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
@@ -111,7 +111,7 @@
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
//if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription(message.queue));
+ message.entry.reject();
}
if (body.getRequeue())
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Dec 18 15:00:40 2007
@@ -77,22 +77,13 @@
/** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- //todo: this should be part of a messageOnQueue object
- private Set<Subscription> _rejectedBy = null;
+ private long _expiration;
- //todo: this should be part of a messageOnQueue object
- private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
- //todo: this should be part of a messageOnQueue object
- private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
private final int hashcode = System.identityHashCode(this);
- //todo: this should be part of a messageOnQueue object
- private long _expiration;
public String debugIdentity()
{
@@ -282,7 +273,7 @@
setContentHeaderBody(contentHeader);
}
- /**
+ /* *
* Used in testing only. This allows the passing of the content header and some body fragments on construction.
*
* @param messageId
@@ -293,7 +284,7 @@
* @param contentBodies
*
* @throws AMQException
- */
+ */ /*
public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
@@ -306,7 +297,7 @@
addContentBodyFrame(storeContext, cb);
}
}
-
+ */
protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageId = msg._messageId;
@@ -485,84 +476,6 @@
return _deliveredToConsumer;
}
- public boolean isTaken(AMQQueue queue)
- {
- // return _taken.get();
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- _takenMap.put(queue, taken);
- }
-
- return taken.get();
- }
- }
-
- public boolean taken(AMQQueue queue, Subscription sub)
- {
- // if (_taken.getAndSet(true))
- // {
- // return true;
- // }
- // else
- // {
- // _takenBySubcription = sub;
- // return false;
- // }
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- }
-
- if (taken.getAndSet(true))
- {
- return true;
- }
- else
- {
- _takenMap.put(queue, taken);
- _takenBySubcriptionMap.put(queue, sub);
-
- return false;
- }
- }
- }
-
- public void release(AMQQueue queue)
- {
- if (_log.isTraceEnabled())
- {
- _log.trace("Releasing Message:" + debugIdentity());
- }
-
- // _taken.set(false);
- // _takenBySubcription = null;
-
- synchronized (this)
- {
- AtomicBoolean taken = _takenMap.get(queue);
- if (taken == null)
- {
- taken = new AtomicBoolean(false);
- }
- else
- {
- taken.set(false);
- }
-
- _deliveredToConsumer = false;
- _takenMap.put(queue, taken);
- _takenBySubcriptionMap.put(queue, null);
- }
- }
public boolean checkToken(Object token)
{
@@ -683,7 +596,6 @@
*/
public boolean expired(AMQQueue queue) throws AMQException
{
- // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -732,7 +644,7 @@
// Increment the references to this message for each queue delivery.
incrementReference();
// normal deliver so add this message at the end.
- _txnContext.deliver(this, q, false);
+ _txnContext.deliver(q.createEntry(this), false);
}
}
finally
@@ -743,175 +655,6 @@
}
}
- /*
- public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
-
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
- if (bodyCount == 0)
- {
- SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
-
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
-
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
- }
-
-
- }
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
- getMessagePublishInfo().getExchange(),
- replyCode, replyText,
- getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
- getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
- new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
- }
- }
- */
public AMQMessageHandle getMessageHandle()
{
@@ -954,47 +697,7 @@
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
// _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
- + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
- }
-
- public Subscription getDeliveredSubscription(AMQQueue queue)
- {
- // return _takenBySubcription;
- synchronized (this)
- {
- return _takenBySubcriptionMap.get(queue);
- }
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
}
- public void reject(Subscription subscription)
- {
- if (subscription != null)
- {
- if (_rejectedBy == null)
- {
- _rejectedBy = new HashSet<Subscription>();
- }
-
- _rejectedBy.add(subscription);
- }
- else
- {
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
- }
- }
-
- public boolean isRejectedBy(Subscription subscription)
- {
- boolean rejected = _rejectedBy != null;
-
- if (rejected) // We have subscriptions that rejected this message
- {
- return _rejectedBy.contains(subscription);
- }
- else // This messasge hasn't been rejected yet.
- {
- return rejected;
- }
- }
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Dec 18 15:00:40 2007
@@ -49,6 +49,7 @@
*/
public class AMQQueue implements Managable, Comparable
{
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
@@ -250,7 +251,7 @@
}
/** @return List of messages(undelivered) on the queue. */
- public List<AMQMessage> getMessagesOnTheQueue()
+ public List<QueueEntry> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
}
@@ -263,7 +264,7 @@
*
* @return List of messages
*/
- public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+ public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
return _deliveryMgr.getMessages(fromMessageId, toMessageId);
}
@@ -276,11 +277,11 @@
/**
* @param messageId
*
- * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+ * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
*/
- public AMQMessage getMessageOnTheQueue(long messageId)
+ public QueueEntry getMessageOnTheQueue(long messageId)
{
- List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+ List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
if ((list == null) || (list.size() == 0))
{
return null;
@@ -319,15 +320,16 @@
toQueue.startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// Move the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
}
@@ -397,15 +399,16 @@
toQueue.startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// Move the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
message.takeReference();
}
@@ -463,15 +466,16 @@
startMovingMessages();
// Get the list of messages to move.
- List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+ List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
try
{
fromStore.beginTran(storeContext);
// remove the messages in on the message store.
- for (AMQMessage message : foundMessagesList)
+ for (QueueEntry entry : foundMessagesList)
{
+ AMQMessage message = entry.getMessage();
fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
}
@@ -513,7 +517,7 @@
_deliveryMgr.startMovingMessages();
}
- private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
+ private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
{
_deliveryMgr.enqueueMovedMessages(storeContext, messageList);
_totalMessagesReceived.addAndGet(messageList.size());
@@ -583,7 +587,7 @@
}
- /** Removes the AMQMessage from the top of the queue. */
+ /** Removes the QueueEntry from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext, this);
@@ -798,27 +802,28 @@
// return _deliveryMgr;
// }
- public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
+ AMQMessage msg = entry.getMessage();
+ _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(msg);
+ updateReceivedMessageCount(entry);
}
catch (NoConsumersException e)
{
// as this message will be returned, it should be removed
// from the queue:
- dequeue(storeContext, msg);
+ dequeue(storeContext, entry);
}
}
- public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
try
{
- msg.dequeue(storeContext, this);
+ entry.getMessage().dequeue(storeContext, this);
}
catch (MessageCleanupException e)
{
@@ -844,8 +849,10 @@
return _subscribers;
}
- protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
+ protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
{
+ AMQMessage msg = entry.getMessage();
+
if (!msg.isRedelivered())
{
_totalMessagesReceived.incrementAndGet();
@@ -933,8 +940,14 @@
_maximumMessageAge = maximumMessageAge;
}
- public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+ public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
{
- _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+ _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
}
+
+ public QueueEntry createEntry(AMQMessage amqMessage)
+ {
+ return new QueueEntry(this, amqMessage);
+ }
+
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Tue Dec 18 15:00:40 2007
@@ -321,11 +321,14 @@
*/
public CompositeData viewMessageContent(long msgId) throws JMException
{
- AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
- if (msg == null)
+ QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
+
+ if (entry == null)
{
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
+
+ AMQMessage msg = entry.getMessage();
// get message content
Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
List<Byte> msgContent = new ArrayList<Byte>();
@@ -381,7 +384,7 @@
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
- List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+ List<QueueEntry> list = _queue.getMessagesOnTheQueue();
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
try
@@ -389,7 +392,7 @@
// Create the tabular list of message header contents
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
- AMQMessage msg = list.get(i - 1);
+ AMQMessage msg = list.get(i - 1).getMessage();
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
String[] headerAttributes = getMessageHeaderProperties(headerBody);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Dec 18 15:00:40 2007
@@ -55,7 +55,7 @@
defaultValue = "false")
public boolean compressBufferOnQueue;
/** Holds any queued messages */
- private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
+ private final MessageQueue<QueueEntry> _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>();
/** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
@@ -107,8 +107,9 @@
}
- private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
+ private boolean addMessageToQueue(QueueEntry entry, boolean deliverFirst)
{
+ AMQMessage msg = entry.getMessage();
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
{
@@ -124,12 +125,12 @@
{
synchronized (_queueHeadLock)
{
- _messages.pushHead(msg);
+ _messages.pushHead(entry);
}
}
else
{
- _messages.offer(msg);
+ _messages.offer(entry);
}
_totalMessageSize.addAndGet(msg.getSize());
@@ -175,11 +176,11 @@
public long getOldestMessageArrival()
{
- AMQMessage msg = _messages.peek();
- return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ QueueEntry entry = _messages.peek();
+ return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
}
- public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry entry)
{
_lock.lock();
try
@@ -188,19 +189,19 @@
{
_log.debug("Queue has adding subscriber content");
_hasContent.add(subscription);
- _totalMessageSize.addAndGet(msg.getSize());
+ _totalMessageSize.addAndGet(entry.getSize());
_extraMessages.addAndGet(1);
}
else
{
_log.debug("Queue has removing subscriber content");
- if (msg == null)
+ if (entry == null)
{
_hasContent.remove(subscription);
}
else
{
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
_extraMessages.addAndGet(-1);
}
}
@@ -222,14 +223,14 @@
*
* @return List of messages
*/
- public List<AMQMessage> getMessages()
+ public List<QueueEntry> getMessages()
{
_lock.lock();
- List<AMQMessage> list = new ArrayList<AMQMessage>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- list.add(message);
+ list.add(entry);
}
_lock.unlock();
@@ -244,7 +245,7 @@
*
* @return
*/
- public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+ public List<QueueEntry> getMessages(long fromMessageId, long toMessageId)
{
if (fromMessageId <= 0 || toMessageId <= 0)
{
@@ -255,14 +256,14 @@
_lock.lock();
- List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+ List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
- for (AMQMessage message : _messages)
+ for (QueueEntry entry : _messages)
{
- long msgId = message.getMessageId();
+ long msgId = entry.getMessage().getMessageId();
if (msgId >= fromMessageId && msgId <= toMessageId)
{
- foundMessagesList.add(message);
+ foundMessagesList.add(entry);
}
// break if the no of messages are found
if (foundMessagesList.size() == maxMessageCount)
@@ -282,16 +283,17 @@
_log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
}
- Iterator<AMQMessage> currentQueue = _messages.iterator();
+ Iterator<QueueEntry> currentQueue = _messages.iterator();
while (currentQueue.hasNext())
{
- AMQMessage message = currentQueue.next();
- if (!message.getDeliveredToConsumer())
+ QueueEntry entry = currentQueue.next();
+
+ if (!entry.getDeliveredToConsumer())
{
- if (subscription.hasInterest(message))
+ if (subscription.hasInterest(entry))
{
- subscription.enqueueForPreDelivery(message, false);
+ subscription.enqueueForPreDelivery(entry, false);
}
}
}
@@ -299,8 +301,8 @@
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
- AMQMessage msg = getNextMessage();
- if (msg == null)
+ QueueEntry entry = getNextMessage();
+ if (entry == null)
{
return false;
}
@@ -322,9 +324,9 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ _log.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId());
}
- _queue.dequeue(channel.getStoreContext(), msg);
+ _queue.dequeue(channel.getStoreContext(), entry);
}
synchronized (channel)
{
@@ -332,22 +334,22 @@
if (acks)
{
- channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
- protocolSession.getProtocolOutputConverter().writeGetOk(msg, channel.getChannelId(),
+ protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
deliveryTag, _queue.getMessageCount());
- _totalMessageSize.addAndGet(-msg.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
}
if (!acks)
{
- msg.decrementReference(channel.getStoreContext());
+ entry.getMessage().decrementReference(channel.getStoreContext());
}
}
finally
{
- msg.setDeliveredToConsumer();
+ entry.setDeliveredToConsumer();
}
return true;
@@ -381,7 +383,7 @@
*
* @param messageList
*/
- public void removeMovedMessages(List<AMQMessage> messageList)
+ public void removeMovedMessages(List<QueueEntry> messageList)
{
// Remove from the
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -391,20 +393,20 @@
{
if (!sub.isSuspended() && sub.filtersMessages())
{
- Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
- for (AMQMessage msg : messageList)
+ Queue<QueueEntry> preDeliveryQueue = sub.getPreDeliveryQueue();
+ for (QueueEntry entry : messageList)
{
- preDeliveryQueue.remove(msg);
+ preDeliveryQueue.remove(entry);
}
}
}
}
- for (AMQMessage msg : messageList)
+ for (QueueEntry entry : messageList)
{
- if (_messages.remove(msg))
+ if (_messages.remove(entry))
{
- _totalMessageSize.getAndAdd(-msg.getSize());
+ _totalMessageSize.getAndAdd(-entry.getSize());
}
}
}
@@ -420,16 +422,16 @@
{
_lock.lock();
- AMQMessage message = _messages.poll();
+ QueueEntry entry = _messages.poll();
- if (message != null)
+ if (entry != null)
{
- queue.dequeue(storeContext, message);
+ queue.dequeue(storeContext, entry);
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
//If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- message.decrementReference(storeContext);
+ entry.getMessage().decrementReference(storeContext);
}
@@ -443,17 +445,17 @@
synchronized (_queueHeadLock)
{
- AMQMessage msg = getNextMessage();
- while (msg != null)
+ QueueEntry entry = getNextMessage();
+ while (entry != null)
{
//and remove it
_messages.poll();
- _queue.dequeue(storeContext, msg);
+ _queue.dequeue(storeContext, entry);
- msg.decrementReference(_reapingStoreContext);
+ entry.getMessage().decrementReference(_reapingStoreContext);
- msg = getNextMessage();
+ entry = getNextMessage();
count++;
}
_totalMessageSize.set(0L);
@@ -469,34 +471,35 @@
*
* @throws org.apache.qpid.AMQException
*/
- private AMQMessage getNextMessage() throws AMQException
+ private QueueEntry getNextMessage() throws AMQException
{
return getNextMessage(_messages, null, false);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub, boolean purgeOnly) throws AMQException
+ private QueueEntry getNextMessage(Queue<QueueEntry> messages, Subscription sub, boolean purgeOnly) throws AMQException
{
- AMQMessage message = messages.peek();
+ QueueEntry entry = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(message, sub, purgeOnly))
+ while (purgeMessage(entry, sub, purgeOnly))
{
+ AMQMessage message = entry.getMessage();
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
// boolean alreadyTaken = message.taken(_queue, sub);
//remove the already taken message or expired
- AMQMessage removed = messages.poll();
+ QueueEntry removed = messages.poll();
- assert removed == message;
+ assert removed == entry;
// if the message expired then the _totalMessageSize needs adjusting
- if (message.expired(_queue) && !message.getDeliveredToConsumer())
+ if (message.expired(_queue) && !entry.getDeliveredToConsumer())
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, message);
+ _queue.dequeue(_reapingStoreContext, entry);
message.decrementReference(_reapingStoreContext);
@@ -515,10 +518,10 @@
}
// try the next message
- message = messages.peek();
+ entry = messages.peek();
}
- return message;
+ return entry;
}
/**
@@ -534,7 +537,7 @@
*
* @throws AMQException
*/
- private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
+ private boolean purgeMessage(QueueEntry message, Subscription sub) throws AMQException
{
return purgeMessage(message, sub, false);
}
@@ -552,7 +555,7 @@
*
* @throws AMQException
*/
- private boolean purgeMessage(AMQMessage message, Subscription sub, boolean purgeOnly) throws AMQException
+ private boolean purgeMessage(QueueEntry message, Subscription sub, boolean purgeOnly) throws AMQException
{
//Original.. complicated while loop control
// (message != null
@@ -567,7 +570,7 @@
if (message != null)
{
// Check that the message hasn't expired.
- if (message.expired(_queue))
+ if (message.expired())
{
return true;
}
@@ -576,13 +579,13 @@
if (sub != null)
{
// if we have a queue browser(we don't purge) so check mark the message as taken
- purge = ((!sub.isBrowser() || message.isTaken(_queue)));
+ purge = ((!sub.isBrowser() || message.isTaken()));
}
else
{
// if there is no subscription we are doing
// a get or purging so mark message as taken.
- message.isTaken(_queue);
+ message.isTaken();
// and then ensure that it gets purged
purge = true;
}
@@ -592,20 +595,20 @@
{
// If we are simply purging the queue don't take the message
// just purge up to the next non-taken msg.
- return purge && message.isTaken(_queue);
+ return purge && message.isTaken();
}
else
{
// if we are purging then ensure we mark this message taken for the current subscriber
// the current subscriber may be null in the case of a get or a purge but this is ok.
- return purge && message.taken(_queue, sub);
+ return purge && message.taken(sub);
}
}
- public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)
{
- Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+ Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
if (_log.isTraceEnabled())
{
@@ -624,16 +627,16 @@
return;
}
- AMQMessage message = null;
- AMQMessage removed = null;
+ QueueEntry entry = null;
+ QueueEntry removed = null;
try
{
synchronized (_queueHeadLock)
{
- message = getNextMessage(messageQueue, sub, false);
+ entry = getNextMessage(messageQueue, sub, false);
// message will be null if we have no messages in the messageQueue.
- if (message == null)
+ if (entry == null)
{
if (_log.isTraceEnabled())
{
@@ -643,7 +646,7 @@
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivery Message :" + message + "(" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivery Message :" + entry + "(" + System.identityHashCode(entry) +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -651,10 +654,10 @@
if (messageQueue == _messages)
{
- _totalMessageSize.addAndGet(-message.getSize());
+ _totalMessageSize.addAndGet(-entry.getSize());
}
- sub.send(message, _queue);
+ sub.send(entry, _queue);
//remove sent message from our queue.
removed = messageQueue.poll();
@@ -662,14 +665,14 @@
// Otherwise the Async send will never end
}
- if (removed != message)
+ if (removed != entry)
{
- _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+ _log.error("Just send message:" + entry.getMessage().debugIdentity() + " BUT removed this from queue:" + removed);
}
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.getMessage().debugIdentity() + "d:" + entry +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
@@ -704,9 +707,9 @@
}
catch (AMQException e)
{
- if (message != null)
+ if (entry != null)
{
- message.release(_queue);
+ entry.release();
}
else
{
@@ -734,23 +737,23 @@
* @param storeContext
* @param movedMessageList
*/
- public void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> movedMessageList)
+ public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
- addMessageToQueue(msg, false);
+ addMessageToQueue(entry, false);
}
// enqueue on the pre delivery queues
for (Subscription sub : _subscriptions.getSubscriptions())
{
- for (AMQMessage msg : movedMessageList)
+ for (QueueEntry entry : movedMessageList)
{
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
- sub.enqueueForPreDelivery(msg, true);
+ sub.enqueueForPreDelivery(entry, true);
}
}
}
@@ -802,30 +805,30 @@
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
+ public void deliver(StoreContext context, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws AMQException
{
final boolean debugEnabled = _log.isDebugEnabled();
if (debugEnabled)
{
- _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
+ _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + entry);
}
//Check if we have someone to deliver the message to.
_lock.lock();
try
{
- Subscription s = _subscriptions.nextSubscriber(msg);
+ Subscription s = _subscriptions.nextSubscriber(entry);
if (s == null || (!s.filtersMessages() && hasQueuedMessages())) //no-one can take the message right now or we're queueing
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+ _log.debug(debugIdentity() + "Testing Message(" + entry + ") for Queued Delivery:" + currentStatus());
}
- if (!msg.getMessagePublishInfo().isImmediate())
+ if (!entry.getMessage().getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg, deliverFirst);
+ addMessageToQueue(entry, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -840,25 +843,25 @@
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
- if (_queue.isShared() && msg.getDeliveredToConsumer())
+ if (_queue.isShared() && entry.getDeliveredToConsumer())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) +
") is already delivered.");
}
continue;
}
// Only give the message to those that want them.
- if (sub.hasInterest(msg))
+ if (sub.hasInterest(entry))
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(entry) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg, deliverFirst);
+ sub.enqueueForPreDelivery(entry, deliverFirst);
}
}
@@ -893,11 +896,11 @@
{
if (_log.isTraceEnabled())
{
- _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
+ _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- if (msg.taken(_queue, s))
+ if (entry.taken(s))
{
//Message has been delivered so don't redeliver.
// This can currently occur because of the recursive call below
@@ -927,14 +930,14 @@
return;
}
//Deliver the message
- s.send(msg, _queue);
+ s.send(entry, _queue);
}
else
{
if (debugEnabled)
{
_log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
- "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
+ "suspended between nextSubscriber and send for message:" + entry.getMessage().debugIdentity());
}
}
}
@@ -943,21 +946,21 @@
// Why do we do this? What was the reasoning? We should have a better approach
// than recursion and rejecting if someone else sends it before we do.
//
- if (!msg.isTaken(_queue))
+ if (!entry.isTaken())
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ _log.debug(debugIdentity() + " Message(" + entry.getMessage().debugIdentity() + ") has not been taken so recursing!:" +
" Subscriber:" + System.identityHashCode(s));
}
- deliver(context, name, msg, deliverFirst);
+ deliver(context, name, entry, deliverFirst);
}
else
{
if (debugEnabled)
{
- _log.debug(debugIdentity() + " Message(" + msg.toString() +
+ _log.debug(debugIdentity() + " Message(" + entry.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Tue Dec 18 15:00:40 2007
@@ -64,13 +64,13 @@
*
* @param storeContext
* @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
+ * @param entry the message to deliver
* @param deliverFirst
*
* @throws org.apache.qpid.server.queue.FailedDequeueException
* if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, QueueEntry entry, boolean deliverFirst) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
@@ -78,15 +78,15 @@
void startMovingMessages();
- void enqueueMovedMessages(StoreContext context, List<AMQMessage> messageList);
+ void enqueueMovedMessages(StoreContext context, List<QueueEntry> messageList);
void stopMovingMessages();
- void removeMovedMessages(List<AMQMessage> messageListToRemove);
+ void removeMovedMessages(List<QueueEntry> messageListToRemove);
- List<AMQMessage> getMessages();
+ List<QueueEntry> getMessages();
- List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+ List<QueueEntry> getMessages(long fromMessageId, long toMessageId);
void populatePreDeliveryQueue(Subscription subscription);
@@ -96,5 +96,5 @@
long getOldestMessageArrival();
- void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+ void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg);
}
Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=605352&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue Dec 18 15:00:40 2007
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.log4j.Logger;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class QueueEntry
+{
+
+ /**
+ * Used for debugging purposes.
+ */
+ private static final Logger _log = Logger.getLogger(QueueEntry.class);
+
+ private final AMQQueue _queue;
+ private final AMQMessage _message;
+
+ private Set<Subscription> _rejectedBy = null;
+
+ private AtomicReference<Object> _owner = new AtomicReference<Object>();
+
+
+ public QueueEntry(AMQQueue queue, AMQMessage message)
+ {
+ _queue = queue;
+ _message = message;
+ }
+
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getSize()
+ {
+ return getMessage().getSize();
+ }
+
+ public boolean getDeliveredToConsumer()
+ {
+ return getMessage().getDeliveredToConsumer();
+ }
+
+ public boolean expired() throws AMQException
+ {
+ return getMessage().expired(_queue);
+ }
+
+ public boolean isTaken()
+ {
+ return _owner.get() != null;
+ }
+
+ public boolean taken(Subscription sub)
+ {
+ return !(_owner.compareAndSet(null, sub == null ? this : sub));
+ }
+
+ public void setDeliveredToConsumer()
+ {
+ getMessage().setDeliveredToConsumer();
+ }
+
+ public void release()
+ {
+ _owner.set(null);
+ }
+
+ public String debugIdentity()
+ {
+ return getMessage().debugIdentity();
+ }
+
+ public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+ {
+ _queue.process(storeContext, this, deliverFirst);
+ }
+
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ _message.checkDeliveredToConsumer();
+ }
+
+ public void setRedelivered(boolean b)
+ {
+ getMessage().setRedelivered(true);
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ synchronized (this)
+ {
+ Object owner = _owner.get();
+ if (owner instanceof Subscription)
+ {
+ return (Subscription) owner;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ public void reject()
+ {
+ reject(getDeliveredSubscription());
+ }
+
+ public void reject(Subscription subscription)
+ {
+ if (subscription != null)
+ {
+ if (_rejectedBy == null)
+ {
+ _rejectedBy = new HashSet<Subscription>();
+ }
+
+ _rejectedBy.add(subscription);
+ }
+ else
+ {
+ _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ }
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ boolean rejected = _rejectedBy != null;
+
+ if (rejected) // We have subscriptions that rejected this message
+ {
+ return _rejectedBy.contains(subscription);
+ }
+ else // This messasge hasn't been rejected yet.
+ {
+ return rejected;
+ }
+ }
+
+
+}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=605352&r1=605351&r2=605352&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 18 15:00:40 2007
@@ -27,7 +27,7 @@
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws AMQException;
+ void send(QueueEntry msg, AMQQueue queue) throws AMQException;
boolean isSuspended();
@@ -35,15 +35,15 @@
boolean filtersMessages();
- boolean hasInterest(AMQMessage msg);
+ boolean hasInterest(QueueEntry msg);
- Queue<AMQMessage> getPreDeliveryQueue();
+ Queue<QueueEntry> getPreDeliveryQueue();
- Queue<AMQMessage> getResendQueue();
+ Queue<QueueEntry> getResendQueue();
- Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+ Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages);
- void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
+ void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
boolean isAutoClose();
@@ -53,9 +53,9 @@
boolean isBrowser();
- boolean wouldSuspend(AMQMessage msg);
+ boolean wouldSuspend(QueueEntry msg);
- void addToResendQueue(AMQMessage msg);
+ void addToResendQueue(QueueEntry msg);
Object getSendLock();