You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [7/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker/...
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Aug 14 20:40:49 2008
@@ -25,131 +25,55 @@
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.exchange.Exchange;
-import java.util.HashMap;
-import java.util.HashSet;
+
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A deliverable message.
*/
-public class AMQMessage
+public class AMQMessage implements Filterable<AMQException>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /** Used in clustering. @todo What for? */
- private Set<Object> _tokens;
-
- /** Only use in clustering. @todo What for? */
- private AMQProtocolSession _publisher;
-
- private final Long _messageId;
-
private final AtomicInteger _referenceCount = new AtomicInteger(1);
- private AMQMessageHandle _messageHandle;
+ private final AMQMessageHandle _messageHandle;
/** Holds the transactional context in which this message is being processed. */
- private TransactionalContext _txnContext;
+ private StoreContext _storeContext;
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ private static final byte IMMEDIATE = 0x01;
/**
* Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
* for messages published with the 'immediate' flag.
*/
- private boolean _deliveredToConsumer;
- /** Flag to indicate that this message requires 'immediate' delivery. */
- private boolean _immediate;
+ private static final byte DELIVERED_TO_CONSUMER = 0x02;
- private TransientMessageData _transientMessageData = new TransientMessageData();
+ private byte _flags = 0;
private long _expiration;
+ private final long _size;
+ private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
- private Exchange _exchange;
- private static final boolean SYNCED_CLOCKS =
- ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);
-
- private static final long UNKNOWN_SIZE = Long.MIN_VALUE;
-
- private long _size = UNKNOWN_SIZE;
-
-
-
- public String debugIdentity()
- {
- return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
- }
-
- public void setExpiration()
- {
- long expiration =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
- long timestamp =
- ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
-
- if (SYNCED_CLOCKS)
- {
- _expiration = expiration;
- }
- else
- {
- // Update TTL to be in broker time.
- if (expiration != 0L)
- {
- if (timestamp != 0L)
- {
- // todo perhaps use arrival time
- long diff = (System.currentTimeMillis() - timestamp);
-
- if ((diff > 1000L) || (diff < 1000L))
- {
- _expiration = expiration + diff;
- }
- }
- }
- }
-
- }
-
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
- public void setExchange(final Exchange exchange)
- {
- _exchange = exchange;
- }
-
- public void route() throws AMQException
- {
- _exchange.route(this);
- }
-
- public void enqueue(final List<AMQQueue> queues)
- {
- _transientMessageData.setDestinationQueues(queues);
- }
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -172,7 +96,7 @@
{
try
{
- return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
+ return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
}
catch (AMQException e)
{
@@ -189,7 +113,7 @@
AMQBody cb =
getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
- _messageId, ++_index));
+ ++_index));
return new AMQFrame(_channel, cb);
}
@@ -212,9 +136,14 @@
}
}
+ public void clearStoreContext()
+ {
+ _storeContext = new StoreContext();
+ }
+
public StoreContext getStoreContext()
{
- return _txnContext.getStoreContext();
+ return _storeContext;
}
private class BodyContentIterator implements Iterator<ContentChunk>
@@ -226,7 +155,7 @@
{
try
{
- return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
+ return _index < (_messageHandle.getBodyCount(getStoreContext()) - 1);
}
catch (AMQException e)
{
@@ -240,7 +169,7 @@
{
try
{
- return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(), ++_index);
}
catch (AMQException e)
{
@@ -254,13 +183,7 @@
}
}
- public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
- {
- _messageId = messageId;
- _txnContext = txnContext;
- _immediate = info.isImmediate();
- _transientMessageData.setMessagePublishInfo(info);
- }
+
/**
* Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
@@ -276,141 +199,85 @@
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
throws AMQException
{
- _messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
- _txnContext = txnConext;
- _transientMessageData = null;
-
+ _storeContext = txnConext.getStoreContext();
+ _size = _messageHandle.getBodySize(txnConext.getStoreContext());
}
- /**
- * Used in testing only. This allows the passing of the content header immediately on construction.
- *
- * @param messageId
- * @param info
- * @param txnContext
- * @param contentHeader
- */
- public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader) throws AMQException
- {
- this(messageId, info, txnContext);
- setContentHeaderBody(contentHeader);
- }
-
- /* *
- * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+ /**
+ * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+ * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+ * queues.
*
- * @param messageId
- * @param info
- * @param txnContext
- * @param contentHeader
- * @param destinationQueues
- * @param contentBodies
+ * @param messageHandle
*
* @throws AMQException
- */ /*
- public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
- MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
+ */
+ public AMQMessage(
+ AMQMessageHandle messageHandle,
+ StoreContext storeConext,
+ MessagePublishInfo info)
+ throws AMQException
{
- this(messageId, info, txnContext, contentHeader);
- _transientMessageData.setDestinationQueues(destinationQueues);
- routingComplete(messageStore, storeContext, messageHandleFactory);
- for (ContentChunk cb : contentBodies)
+ _messageHandle = messageHandle;
+ _storeContext = storeConext;
+
+ if(info.isImmediate())
{
- addContentBodyFrame(storeContext, cb);
+ _flags |= IMMEDIATE;
}
+ _size = messageHandle.getBodySize(storeConext);
+
}
- */
+
+
protected AMQMessage(AMQMessage msg) throws AMQException
{
- _messageId = msg._messageId;
_messageHandle = msg._messageHandle;
- _txnContext = msg._txnContext;
- _deliveredToConsumer = msg._deliveredToConsumer;
- _transientMessageData = msg._transientMessageData;
- }
+ _storeContext = msg._storeContext;
+ _flags = msg._flags;
+ _size = msg._size;
- public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
- {
- return new BodyFrameIterator(protocolSession, channel);
}
- public Iterator<ContentChunk> getContentBodyIterator()
+
+ public String debugIdentity()
{
- return new BodyContentIterator();
+ return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() + " Ref:" + _referenceCount.get() + ")";
}
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public void setExpiration(final long expiration)
{
- if (_transientMessageData != null)
- {
- return _transientMessageData.getContentHeaderBody();
- }
- else
- {
- return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
- }
+
+ _expiration = expiration;
+
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
+ public boolean isReferenced()
{
- _transientMessageData.setContentHeaderBody(contentHeaderBody);
- _size = _transientMessageData.getContentHeaderBody().bodySize;
+ return _referenceCount.get() > 0;
}
- public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
- throws AMQException
+ public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
- final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
- if (persistent)
- {
- _txnContext.beginTranIfNecessary();
- }
-
- // enqueuing the messages ensure that if required the destinations are recorded to a
- // persistent store
-
- for (AMQQueue q : _transientMessageData.getDestinationQueues())
- {
- _messageHandle.enqueue(storeContext, _messageId, q);
- }
-
- if (_transientMessageData.getContentHeaderBody().bodySize == 0)
- {
- deliver(storeContext);
- }
-
-
+ return new BodyFrameIterator(protocolSession, channel);
}
- public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
+ public Iterator<ContentChunk> getContentBodyIterator()
{
- _transientMessageData.addBodyLength(contentChunk.getSize());
- final boolean allContentReceived = isAllContentReceived();
- _messageHandle.addContentBodyFrame(storeContext, _messageId, contentChunk, allContentReceived);
- if (allContentReceived)
- {
- deliver(storeContext);
-
- return true;
- }
- else
- {
- return false;
- }
+ return new BodyContentIterator();
}
- public boolean isAllContentReceived() throws AMQException
+ public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _transientMessageData.isAllContentReceived();
+ return _messageHandle.getContentHeaderBody(getStoreContext());
}
+
+
public Long getMessageId()
{
- return _messageId;
+ return _messageHandle.getMessageId();
}
/**
@@ -424,14 +291,24 @@
return this;
}
- /** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ public boolean incrementReference()
+ {
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
{
- _referenceCount.incrementAndGet();
- // if (_log.isDebugEnabled())
- // {
- // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
- // }
+ if(_referenceCount.addAndGet(count) <= 1)
+ {
+ _referenceCount.addAndGet(-count);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+
}
/**
@@ -445,6 +322,7 @@
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+
int count = _referenceCount.decrementAndGet();
// note that the operation of decrementing the reference count and then removing the message does not
@@ -453,25 +331,25 @@
// not relying on the all the increments having taken place before the delivery manager decrements.
if (count == 0)
{
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
try
{
- // if (_log.isDebugEnabled())
- // {
- // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
- // }
-
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
if (_messageHandle != null)
{
- _messageHandle.removeMessage(storeContext, _messageId);
+ _messageHandle.removeMessage(storeContext);
}
}
catch (AMQException e)
{
// to maintain consistency, we revert the count
incrementReference();
- throw new MessageCleanupException(_messageId, e);
+ throw new MessageCleanupException(getMessageId(), e);
}
}
else
@@ -484,15 +362,6 @@
}
}
- public void setPublisher(AMQProtocolSession publisher)
- {
- _publisher = publisher;
- }
-
- public AMQProtocolSession getPublisher()
- {
- return _publisher;
- }
/**
* Called selectors to determin if the message has already been sent
@@ -501,101 +370,30 @@
*/
public boolean getDeliveredToConsumer()
{
- return _deliveredToConsumer;
- }
-
-
- public boolean checkToken(Object token)
- {
-
- if (_tokens == null)
- {
- _tokens = new HashSet<Object>();
- }
-
- if (_tokens.contains(token))
- {
- return true;
- }
- else
- {
- _tokens.add(token);
-
- return false;
- }
- }
-
- /**
- * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
- * the message. This will be called before any content bodies have been received so that the choice of
- * AMQMessageHandle implementation can be picked based on various criteria.
- *
- * @param queue the queue
- *
- * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
- */
- public void enqueue(AMQQueue queue) throws AMQException
- {
- _transientMessageData.addDestinationQueue(queue);
- }
-
- /**
- * NOTE: Think about why you are using this method. Normal usages would want to do
- * AMQQueue.dequeue(StoreContext, AMQMessage)
- * This will keep the queue statistics up-to-date.
- * Currently this method is only called _correctly_ from AMQQueue dequeue.
- * Ideally we would have a better way for the queue to dequeue the message.
- * Especially since enqueue isn't the recipriocal of this method.
- * @deprecated
- * @param storeContext
- * @param queue
- * @throws AMQException
- */
- void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
- {
- _messageHandle.dequeue(storeContext, _messageId, queue);
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
public boolean isPersistent() throws AMQException
{
- if (_transientMessageData != null)
- {
- return _transientMessageData.isPersistent();
- }
- else
- {
- return _messageHandle.isPersistent(getStoreContext(), _messageId);
- }
+ return _messageHandle.isPersistent();
}
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException
+ public boolean immediateAndNotDelivered()
{
- if (_immediate && !_deliveredToConsumer)
- {
- throw new NoConsumersException(this);
- }
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
+
}
public MessagePublishInfo getMessagePublishInfo() throws AMQException
{
- MessagePublishInfo pb;
- if (_transientMessageData != null)
- {
- pb = _transientMessageData.getMessagePublishInfo();
- }
- else
- {
- pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
- }
-
- return pb;
+ return _messageHandle.getMessagePublishInfo(getStoreContext());
}
public boolean isRedelivered()
@@ -641,46 +439,9 @@
*/
public void setDeliveredToConsumer()
{
- _deliveredToConsumer = true;
+ _flags |= DELIVERED_TO_CONSUMER;
}
- private void deliver(StoreContext storeContext) throws AMQException
- {
- // we get a reference to the destination queues now so that we can clear the
- // transient message data as quickly as possible
- List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
- if (_log.isDebugEnabled())
- {
- _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
- }
-
- try
- {
- // first we allow the handle to know that the message has been fully received. This is useful if it is
- // maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
- _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
-
- // we then allow the transactional context to do something with the message content
- // now that it has all been received, before we attempt delivery
- _txnContext.messageFullyReceived(isPersistent());
-
- for (AMQQueue q : destinationQueues)
- {
- // Increment the references to this message for each queue delivery.
- incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q.createEntry(this), false);
- }
- }
- finally
- {
-
- // Remove refence for routing process . Reference count should now == delivered queue count
- decrementReference(storeContext);
- _transientMessageData = null;
- }
- }
public AMQMessageHandle getMessageHandle()
@@ -690,28 +451,23 @@
public long getSize()
{
- if(_size == UNKNOWN_SIZE)
- {
- try
- {
- _size = getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- _log.warn("Unable to retrieve message meta data for message:" + this, e);
- return 0;
- }
- }
return _size;
+
+ }
+
+ public Object getPublisherClientInstance()
+ {
+ return _sessionIdentifier.getSessionInstance();
+ }
+
+ public Object getPublisherIdentifier()
+ {
+ return _sessionIdentifier.getSessionIdentifier();
}
- public void restoreTransientMessageData() throws AMQException
+ public void setClientIdentifier(final AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier)
{
- TransientMessageData transientMessageData = new TransientMessageData();
- transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
- transientMessageData.setContentHeaderBody(getContentHeaderBody());
- transientMessageData.addBodyLength(getContentHeaderBody().getSize());
- _transientMessageData = transientMessageData;
+ _sessionIdentifier = sessionIdentifier;
}
@@ -720,7 +476,7 @@
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
// _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
+ return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + _referenceCount;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -29,23 +29,27 @@
/**
* A pluggable way of getting message data. Implementations can provide intelligent caching for example or
* even no caching at all to minimise the broker memory footprint.
- *
- * The method all take a messageId to avoid having to store it in the instance - the AMQMessage container
- * must already keen the messageId so it is pointless storing it twice.
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
+ ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException;
+
+ /**
+ *
+ * @return the messageId for the message associated with this handle
+ */
+ Long getMessageId();
+
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount(StoreContext context, Long messageId) throws AMQException;
+ int getBodyCount(StoreContext context) throws AMQException;
/**
* @return the size of the body
*/
- long getBodySize(StoreContext context, Long messageId) throws AMQException;
+ long getBodySize(StoreContext context) throws AMQException;
/**
* Get a particular content body
@@ -53,27 +57,23 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentChunk getContentChunk(StoreContext context, int index) throws IllegalArgumentException, AMQException;
- void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody) throws AMQException;
- MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException;
+ MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
- boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
+ boolean isPersistent();
- void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
+ void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException;
- void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
-
- void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
-
- void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
+ void removeMessage(StoreContext storeContext) throws AMQException;
long getArrivalTime();
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Aug 14 20:40:49 2008
@@ -20,1005 +20,191 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
-import javax.management.JMException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
- * fully in RFC 006.
- */
-public class AMQQueue implements Managable, Comparable
-{
-
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- public static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
-
- private static final Logger _logger = Logger.getLogger(AMQQueue.class);
-
- private final AMQShortString _name;
-
- /** null means shared */
- private final AMQShortString _owner;
-
- private final boolean _durable;
-
- /** If true, this queue is deleted when the last subscriber is removed */
- private final boolean _autoDelete;
-
- /** Holds subscribers to the queue. */
- private final SubscriptionSet _subscribers;
-
- private final SubscriptionFactory _subscriptionFactory;
-
- private final AtomicInteger _subscriberCount = new AtomicInteger();
-
- private final AtomicBoolean _isExclusive = new AtomicBoolean();
-
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
-
- private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
-
- /** Manages message delivery. */
- private final DeliveryManager _deliveryMgr;
-
- /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- private final ExchangeBindings _bindings = new ExchangeBindings(this);
-
- /** Executor on which asynchronous delivery will be carriedout where required */
- private final Executor _asyncDelivery;
-
- private final AMQQueueMBean _managedObject;
-
- private final VirtualHost _virtualHost;
-
- /** max allowed size(KB) of a single message */
- @Configured(path = "maximumMessageSize", defaultValue = "0")
- public long _maximumMessageSize;
-
- /** max allowed number of messages on a queue. */
- @Configured(path = "maximumMessageCount", defaultValue = "0")
- public long _maximumMessageCount;
-
- /** max queue depth for the queue */
- @Configured(path = "maximumQueueDepth", defaultValue = "0")
- public long _maximumQueueDepth;
-
- /** maximum message age before alerts occur */
- @Configured(path = "maximumMessageAge", defaultValue = "0")
- public long _maximumMessageAge;
-
- /** the minimum interval between sending out consequetive alerts of the same type */
- @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
- public long _minimumAlertRepeatGap;
-
- /** total messages received by the queue since startup. */
- public AtomicLong _totalMessagesReceived = new AtomicLong();
-
-
- private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
-
- public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(),
- new SubscriptionSet(), new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, SubscriptionSet subscribers) throws AMQException
- {
- this(name, durable, owner, autoDelete, virtualHost, AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers,
- new SubscriptionImpl.Factory());
- }
-
- protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete,
- VirtualHost virtualHost, Executor asyncDelivery, SubscriptionSet subscribers,
- SubscriptionFactory subscriptionFactory) throws AMQException
- {
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
-
- if (virtualHost == null)
- {
- throw new IllegalArgumentException("Virtual Host must not be null");
- }
-
- _name = name;
- _durable = durable;
- _owner = owner;
- _autoDelete = autoDelete;
- _virtualHost = virtualHost;
- _asyncDelivery = asyncDelivery;
-
- _managedObject = createMBean();
- _managedObject.register();
-
- _subscribers = subscribers;
- _subscriptionFactory = subscriptionFactory;
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
-
- // This ensure that the notification checks for the configured alerts are created.
- setMaximumMessageAge(_maximumMessageAge);
- setMaximumMessageCount(_maximumMessageCount);
- setMaximumMessageSize(_maximumMessageSize);
- setMaximumQueueDepth(_maximumQueueDepth);
-
- }
-
- private AMQQueueMBean createMBean() throws AMQException
- {
- try
- {
- return new AMQQueueMBean(this);
- }
- catch (JMException ex)
- {
- throw new AMQException("AMQQueue MBean creation has failed ", ex);
- }
- }
-
- public final AMQShortString getName()
- {
- return _name;
- }
+import java.util.List;
+import java.util.Set;
- public boolean isShared()
- {
- return _owner == null;
- }
+public interface AMQQueue extends Managable, Comparable<AMQQueue>
+{
- public boolean isDurable()
- {
- return _durable;
- }
+ AMQShortString getName();
- public AMQShortString getOwner()
- {
- return _owner;
- }
+ boolean isDurable();
- public boolean isAutoDelete()
- {
- return _autoDelete;
- }
+ boolean isAutoDelete();
- public boolean isDeleted()
- {
- return _deleted.get();
- }
+ AMQShortString getOwner();
- /** @return no of messages(undelivered) on the queue. */
- public int getMessageCount()
- {
- return _deliveryMgr.getQueueMessageCount();
- }
+ VirtualHost getVirtualHost();
- /** @return List of messages(undelivered) on the queue. */
- public List<QueueEntry> getMessagesOnTheQueue()
- {
- return _deliveryMgr.getMessages();
- }
- /**
- * Returns messages within the given range of message Ids.
- *
- * @param fromMessageId
- * @param toMessageId
- *
- * @return List of messages
- */
- public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
- {
- return _deliveryMgr.getMessages(fromMessageId, toMessageId);
- }
+ void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
- public long getQueueDepth()
- {
- return _deliveryMgr.getTotalMessageSize();
- }
+ void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
- /**
- * @param messageId
- *
- * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
- */
- public QueueEntry getMessageOnTheQueue(long messageId)
- {
- List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
- if ((list == null) || (list.size() == 0))
- {
- return null;
- }
+ List<ExchangeBinding> getExchangeBindings();
- return list.get(0);
- }
- /**
- * Moves messages from this queue to another queue, and also commits the move on the message store. Delivery activity
- * on the queues being moved between is suspended during the move.
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param queueName The queue to move the messages to.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+ void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
- MessageStore fromStore = getVirtualHost().getMessageStore();
- MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
+ void unregisterSubscription(final Subscription subscription) throws AMQException;
- if (toStore != fromStore)
- {
- throw new RuntimeException("Can only move messages between queues on the same message store.");
- }
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
- toQueue.startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
- toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // Move the messages on the in-memory queues.
- toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
- _deliveryMgr.removeMovedMessages(foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- toQueue.stopMovingMessages();
- stopMovingMessages();
- }
- }
+ int getConsumerCount();
- /**
- * Copies messages on this queue to another queue, and also commits the move on the message store. Delivery activity
- * on the queues being moved between is suspended during the move.
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param queueName The queue to move the messages to.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext)
- {
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-
- MessageStore fromStore = getVirtualHost().getMessageStore();
- MessageStore toStore = toQueue.getVirtualHost().getMessageStore();
-
- if (toStore != fromStore)
- {
- throw new RuntimeException("Can only move messages between queues on the same message store.");
- }
-
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
- toQueue.startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // Move the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
- message.takeReference();
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // Move the messages on the in-memory queues.
- toQueue.enqueueMovedMessages(storeContext, foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- toQueue.stopMovingMessages();
- stopMovingMessages();
- }
- }
-
- /**
- * Removes messages from this queue, and also commits the remove on the message store. Delivery activity
- * on the queues being moved between is suspended during the remove.
- *
- * @param fromMessageId The first message id to move.
- * @param toMessageId The last message id to move.
- * @param storeContext The context of the message store under which to perform the move. This is associated with
- * the stores transactional context.
- */
- public synchronized void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
- {
- MessageStore fromStore = getVirtualHost().getMessageStore();
-
- try
- {
- // Obtain locks to prevent activity on the queues being moved between.
- startMovingMessages();
-
- // Get the list of messages to move.
- List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
-
- try
- {
- fromStore.beginTran(storeContext);
-
- // remove the messages in on the message store.
- for (QueueEntry entry : foundMessagesList)
- {
- AMQMessage message = entry.getMessage();
- fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
- }
-
- // Commit and flush the move transcations.
- try
- {
- fromStore.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
-
- // remove the messages on the in-memory queues.
- _deliveryMgr.removeMovedMessages(foundMessagesList);
- }
- // Abort the move transactions on move failures.
- catch (AMQException e)
- {
- try
- {
- fromStore.abortTran(storeContext);
- }
- catch (AMQException ae)
- {
- throw new RuntimeException("Failed to abort transaction whilst moving messages on message store.", ae);
- }
- }
- }
- // Release locks to allow activity on the queues being moved between to continue.
- finally
- {
- stopMovingMessages();
- }
- }
+ int getActiveConsumerCount();
- public void startMovingMessages()
- {
- _deliveryMgr.startMovingMessages();
- }
+ boolean isUnused();
- private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
- {
- _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
- _totalMessagesReceived.addAndGet(messageList.size());
- }
+ boolean isEmpty();
- public void stopMovingMessages()
- {
- _deliveryMgr.stopMovingMessages();
- _deliveryMgr.processAsync(_asyncDelivery);
- }
+ int getMessageCount();
- /** @return MBean object associated with this Queue */
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
+ int getUndeliveredMessageCount();
- public long getMaximumMessageSize()
- {
- return _maximumMessageSize;
- }
- public void setMaximumMessageSize(final long maximumMessageSize)
- {
- _maximumMessageSize = maximumMessageSize;
- if(maximumMessageSize == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- }
+ long getQueueDepth();
- public int getConsumerCount()
- {
- return _subscribers.size();
- }
+ long getReceivedMessageCount();
- public int getActiveConsumerCount()
- {
- return _subscribers.getWeight();
- }
+ long getOldestMessageArrivalTime();
- public long getReceivedMessageCount()
- {
- return _totalMessagesReceived.get();
- }
+ boolean isDeleted();
- public long getMaximumMessageCount()
- {
- return _maximumMessageCount;
- }
- public void setMaximumMessageCount(final long maximumMessageCount)
- {
- _maximumMessageCount = maximumMessageCount;
- if(maximumMessageCount == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
- }
+ int delete() throws AMQException;
+ QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
- }
+ void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
- public long getMaximumQueueDepth()
- {
- return _maximumQueueDepth;
- }
+ void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
- // Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(final long maximumQueueDepth)
- {
- _maximumQueueDepth = maximumQueueDepth;
- if(maximumQueueDepth == 0L)
- {
- _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- }
- public long getOldestMessageArrivalTime()
- {
- return _deliveryMgr.getOldestMessageArrival();
+ boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
- }
+
- /** Removes the QueueEntry from the top of the queue. */
- public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
- {
- _deliveryMgr.removeAMessageFromTop(storeContext, this);
- }
+ void addQueueDeleteTask(final Task task);
- /** removes all the messages from the queue. */
- public synchronized long clearQueue(StoreContext storeContext) throws AMQException
- {
- return _deliveryMgr.clearAllMessages(storeContext);
- }
- public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
- {
- exchange.registerQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
- }
+ List<QueueEntry> getMessagesOnTheQueue();
- _bindings.addBinding(routingKey, arguments, exchange);
- }
+ List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
- public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
- {
- exchange.deregisterQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
- }
+ List<Long> getMessagesOnTheQueue(int num);
- _bindings.remove(routingKey, arguments, exchange);
- }
+ List<Long> getMessagesOnTheQueue(int num, int offest);
- public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
- {
- if (incrementSubscriberCount() > 1)
- {
- if (isExclusive())
- {
- decrementSubscriberCount();
- throw new ExistingExclusiveSubscription();
- }
- else if (exclusive)
- {
- decrementSubscriberCount();
- throw new ExistingSubscriptionPreventsExclusive();
- }
+ QueueEntry getMessageOnTheQueue(long messageId);
- }
- else if (exclusive)
- {
- setExclusive(true);
- }
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and "
- + "consumer tag {2} with {3}", ps, channel, consumerTag, this));
- }
+ void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
+ StoreContext storeContext);
- Subscription subscription =
- _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this);
+ void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
- if (subscription.filtersMessages())
- {
- if (_deliveryMgr.hasQueuedMessages())
- {
- _deliveryMgr.populatePreDeliveryQueue(subscription);
- }
- }
+ void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
- _subscribers.addSubscriber(subscription);
- if(exclusive)
- {
- _subscribers.setExclusive(true);
- }
- subscription.start();
- }
- private boolean isExclusive()
- {
- return _isExclusive.get();
- }
+ long getMaximumMessageSize();
- private void setExclusive(boolean exclusive)
- {
- _isExclusive.set(exclusive);
- }
+ void setMaximumMessageSize(long value);
- private int incrementSubscriberCount()
- {
- return _subscriberCount.incrementAndGet();
- }
- private int decrementSubscriberCount()
- {
- return _subscriberCount.decrementAndGet();
- }
+ long getMaximumMessageCount();
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(
- "Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}",
- ps, channel, consumerTag, this));
- }
+ void setMaximumMessageCount(long value);
- _subscribers.setExclusive(false);
- Subscription removedSubscription;
- if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps,
- consumerTag)))
- == null)
- {
- throw new AMQException("Protocol session with channel " + channel + " and consumer tag " + consumerTag
- + " and protocol session key " + ps.getKey() + " not registered with queue " + this);
- }
- removedSubscription.close();
- setExclusive(false);
- decrementSubscriberCount();
+ long getMaximumQueueDepth();
- // if we are eligible for auto deletion, unregister from the queue registry
- if (_autoDelete && _subscribers.isEmpty())
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Auto-deleteing queue:" + this);
- }
-
- autodelete();
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- removedSubscription.queueDeleted(this);
- }
- }
+ void setMaximumQueueDepth(long value);
- public boolean isUnused()
- {
- return _subscribers.isEmpty();
- }
- public boolean isEmpty()
- {
- return !_deliveryMgr.hasQueuedMessages();
- }
+ long getMaximumMessageAge();
- public int delete(boolean checkUnused, boolean checkEmpty) throws AMQException
- {
- if (checkUnused && !_subscribers.isEmpty())
- {
- _logger.info("Will not delete " + this + " as it is in use.");
+ void setMaximumMessageAge(final long maximumMessageAge);
- return 0;
- }
- else if (checkEmpty && _deliveryMgr.hasQueuedMessages())
- {
- _logger.info("Will not delete " + this + " as it is not empty.");
- return 0;
- }
- else
- {
- delete();
+ long getMinimumAlertRepeatGap();
- return _deliveryMgr.getQueueMessageCount();
- }
- }
- public void delete() throws AMQException
- {
- if (!_deleted.getAndSet(true))
- {
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
- _managedObject.unregister();
- for (Task task : _deleteTaskList)
- {
- task.doTask(this);
- }
+ void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
- _deleteTaskList.clear();
- }
- }
+ long clearQueue(StoreContext storeContext) throws AMQException;
- protected void autodelete() throws AMQException
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format("autodeleting {0}", this));
- }
- delete();
- }
- /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
- {
- // fixme not sure what this is doing. should we be passing deliverFirst through here?
- // This code is not used so when it is perhaps it should
- _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
- try
- {
- msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(msg);
- }
- catch (NoConsumersException e)
- {
- // as this message will be returned, it should be removed
- // from the queue:
- dequeue(storeContext, msg);
- }
- }*/
+ void removeExpiredIfNoSubscribers() throws AMQException;
- // public DeliveryManager getDeliveryManager()
- // {
- // return _deliveryMgr;
- // }
+ Set<NotificationCheck> getNotificationChecks();
- public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
- {
- AMQMessage msg = entry.getMessage();
- _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst);
- try
- {
- msg.checkDeliveredToConsumer();
- updateReceivedMessageCount(entry);
- }
- catch (NoConsumersException e)
- {
- // as this message will be returned, it should be removed
- // from the queue:
- dequeue(storeContext, entry);
- }
- }
+ void flushSubscription(final Subscription sub) throws AMQException;
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
- {
- try
- {
- entry.getMessage().dequeue(storeContext, this);
- }
- catch (MessageCleanupException e)
- {
- // Message was dequeued, but could not then be deleted
- // though it is no longer referenced. This should be very
- // rare and can be detected and cleaned up on recovery or
- // done through some form of manual intervention.
- _logger.error(e, e);
- }
- catch (AMQException e)
- {
- throw new FailedDequeueException(_name.toString(), e);
- }
- }
+ void deliverAsync(final Subscription sub);
- public void deliverAsync()
- {
- _deliveryMgr.processAsync(_asyncDelivery);
- }
+ void deliverAsync();
- protected SubscriptionManager getSubscribers()
- {
- return _subscribers;
- }
- protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
+ /**
+ * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingExclusiveSubscription extends AMQException
{
- AMQMessage msg = entry.getMessage();
-
- if (!msg.isRedelivered())
- {
- _totalMessagesReceived.incrementAndGet();
- }
- try
- {
- _managedObject.checkForNotification(msg);
- }
- catch (JMException e)
+ public ExistingExclusiveSubscription()
{
- throw new AMQException("Unable to get notification from manage queue: " + e, e);
+ super("");
}
}
- public boolean equals(Object o)
+ /**
+ * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+ * already exists.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Move to top level, used outside this class.
+ */
+ static final class ExistingSubscriptionPreventsExclusive extends AMQException
{
- if (this == o)
- {
- return true;
- }
-
- if ((o == null) || (getClass() != o.getClass()))
+ public ExistingSubscriptionPreventsExclusive()
{
- return false;
+ super("");
}
-
- final AMQQueue amqQueue = (AMQQueue) o;
-
- return (_name.equals(amqQueue._name));
- }
-
- public int hashCode()
- {
- return _name.hashCode();
}
- public String toString()
- {
- return "Queue(" + _name + ")@" + System.identityHashCode(this);
- }
-
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
- {
- return _deliveryMgr.performGet(session, channel, acks);
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return _virtualHost.getQueueRegistry();
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public static interface Task
+ static interface Task
{
public void doTask(AMQQueue queue) throws AMQException;
}
-
- public void addQueueDeleteTask(Task task)
- {
- _deleteTaskList.add(task);
- }
-
- public long getMinimumAlertRepeatGap()
- {
- return _minimumAlertRepeatGap;
- }
-
- public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
- {
- _minimumAlertRepeatGap = minimumAlertRepeatGap;
- }
-
- public long getMaximumMessageAge()
- {
- return _maximumMessageAge;
- }
-
- public void setMaximumMessageAge(long maximumMessageAge)
- {
- _maximumMessageAge = maximumMessageAge;
- if(maximumMessageAge == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- }
-
- public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
- {
- _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
- }
-
- public QueueEntry createEntry(AMQMessage amqMessage)
- {
- return new QueueEntry(this, amqMessage);
- }
-
- public int compareTo(Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
-
-
- public void removeExpiredIfNoSubscribers() throws AMQException
- {
- synchronized(_subscribers.getChangeLock())
- {
- if(_subscribers.isEmpty())
- {
- _deliveryMgr.removeExpired();
- }
- }
- }
-
- public final Set<NotificationCheck> getNotificationChecks()
- {
- return _notificationChecks;
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Aug 14 20:40:49 2008
@@ -292,7 +292,7 @@
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#deleteMessageFromTop
+ * @see AMQQueue#deleteMessageFromTop
*/
public void deleteMessageFromTop() throws JMException
{
@@ -307,7 +307,7 @@
}
/**
- * @see org.apache.qpid.server.queue.AMQQueue#clearQueue
+ * @see AMQQueue#clearQueue
*/
public void clearQueue() throws JMException
{
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Thu Aug 14 20:40:49 2008
@@ -36,59 +36,6 @@
*/
class ExchangeBindings
{
- private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
-
- static class ExchangeBinding
- {
- private final Exchange _exchange;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange)
- {
- this(routingKey, exchange, EMPTY_ARGUMENTS);
- }
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
- {
- _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
- _exchange = exchange;
- _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
- }
-
- void unbind(AMQQueue queue) throws AMQException
- {
- _exchange.deregisterQueue(_routingKey, queue, _arguments);
- }
-
- public Exchange getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public int hashCode()
- {
- return (_exchange == null ? 0 : _exchange.hashCode())
- + (_routingKey == null ? 0 : _routingKey.hashCode());
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof ExchangeBinding))
- {
- return false;
- }
- ExchangeBinding eb = (ExchangeBinding) o;
- return _exchange.equals(eb._exchange)
- && _routingKey.equals(eb._routingKey);
- }
- }
-
private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
private final AMQQueue _queue;
@@ -109,9 +56,9 @@
}
- public void remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
+ public boolean remove(AMQShortString routingKey, FieldTable arguments, Exchange exchange)
{
- _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
+ return _bindings.remove(new ExchangeBinding(routingKey, exchange, arguments));
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,10 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.Collections;
import java.util.ArrayList;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -41,32 +40,40 @@
private MessagePublishInfo _messagePublishInfo;
- private List<ContentChunk> _contentBodies = new ArrayList<ContentChunk>();
+ private List<ContentChunk> _contentBodies;
private boolean _redelivered;
private long _arrivalTime;
- public InMemoryMessageHandle()
+ private final Long _messageId;
+
+ public InMemoryMessageHandle(final Long messageId)
{
+ _messageId = messageId;
}
- public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
{
return _contentHeaderBody;
}
- public int getBodyCount(StoreContext context, Long messageId)
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
+ public int getBodyCount(StoreContext context)
{
return _contentBodies.size();
}
- public long getBodySize(StoreContext context, Long messageId) throws AMQException
+ public long getBodySize(StoreContext context) throws AMQException
{
- return getContentHeaderBody(context, messageId).bodySize;
+ return getContentHeaderBody(context).bodySize;
}
- public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -76,13 +83,28 @@
return _contentBodies.get(index);
}
- public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
+ public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentBody, boolean isLastContentBody)
throws AMQException
{
- _contentBodies.add(contentBody);
+ if(_contentBodies == null)
+ {
+ if(isLastContentBody)
+ {
+ _contentBodies = Collections.singletonList(contentBody);
+ }
+ else
+ {
+ _contentBodies = new ArrayList<ContentChunk>();
+ _contentBodies.add(contentBody);
+ }
+ }
+ else
+ {
+ _contentBodies.add(contentBody);
+ }
}
- public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
+ public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
{
return _messagePublishInfo;
}
@@ -98,12 +120,9 @@
_redelivered = redelivered;
}
- public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
+ public boolean isPersistent()
{
- //todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(context, messageId);
- return chb.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ return false;
}
/**
@@ -112,26 +131,20 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo messagePublishInfo,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
+ if(contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = Collections.EMPTY_LIST;
+ }
_arrivalTime = System.currentTimeMillis();
}
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
- {
- // NO OP
- }
-
- public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
- {
- // NO OP
- }
-
- public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+ public void removeMessage(StoreContext storeContext) throws AMQException
{
// NO OP
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Thu Aug 14 20:40:49 2008
@@ -126,7 +126,7 @@
* @param age maximum age of message.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on thr broker")
+ @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on the broker")
void setMaximumMessageAge(Long age) throws IOException;
/**
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java Thu Aug 14 20:40:49 2008
@@ -36,11 +36,11 @@
// just hardcoded for now
if (persistent)
{
- return new WeakReferenceMessageHandle(store);
+ return new WeakReferenceMessageHandle(messageId, store);
}
else
{
- return new InMemoryMessageHandle();
+ return new InMemoryMessageHandle(messageId);
}
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Thu Aug 14 20:40:49 2008
@@ -1,129 +1,138 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize = (msg == null) ? 0 : msg.getSize();
-
- if (messageSize >= maximumMessageSize)
- {
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount;
+ final long maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+ try
+ {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ }
+ catch (AMQException e)
+ {
+ messageSize = 0;
+ }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
('svn:eol-style' removed)