You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/02/20 15:48:29 UTC
svn commit: r746259 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/ack/
main/java/org/apache/qpid/server/queue/
main/java/org/apache/qpid/server/store/
main/java/org/apache/qpid/server/subscrip...
Author: ritchiem
Date: Fri Feb 20 14:48:28 2009
New Revision: 746259
URL: http://svn.apache.org/viewvc?rev=746259&view=rev
Log:
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting
Added:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 20 14:48:28 2009
@@ -251,7 +251,7 @@
}
catch (NoRouteException e)
{
- //_currentMessage.incrementReference();
+ //_currentMessage.takeReference();
_returnMessages.add(e);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Feb 20 14:48:28 2009
@@ -61,7 +61,8 @@
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
- _amqMessage = payload.takeReference();
+ payload.incrementReference(1);
+ _amqMessage = payload;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Fri Feb 20 14:48:28 2009
@@ -130,7 +130,7 @@
//in memory (persistent changes will be rolled back by store)
for (QueueEntry msg : _unacked.values())
{
- msg.getMessage().takeReference();
+ msg.getMessage().incrementReference(1);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 20 14:48:28 2009
@@ -126,9 +126,5 @@
boolean incrementReference(int queueCount);
- boolean incrementReference();
-
- AMQMessage takeReference();
-
boolean isReferenced();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Feb 20 14:48:28 2009
@@ -78,6 +78,10 @@
final AMQProtocolSession publisher,
TransactionLog messasgeStore)
{
+ if (publisher == null)
+ {
+ throw new NullPointerException("Message Publisher cannot be null");
+ }
_messagePublishInfo = info;
_txnContext = txnContext;
_publisher = publisher;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java Fri Feb 20 14:48:28 2009
@@ -42,19 +42,19 @@
_messageId = new AtomicLong(0L);
}
- public void start()
+ public void recoveryComplete()
{
_state = State.OPEN;
}
/**
- * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
- */
- protected void enableRecover()
+ * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
+ */
+ public void reset()
{
_state = State.RECOVER;
+ _messageId = new AtomicLong(0L);
}
-
/**
* Normal message creation path
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java Fri Feb 20 14:48:28 2009
@@ -60,6 +60,7 @@
@Override
public void removeMessage(StoreContext storeContext) throws AMQException
{
+ _log.info("PAMQM : removing message:" + _messageId);
_transactionLog.removeMessage(storeContext, _messageId);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 20 14:48:28 2009
@@ -301,10 +301,16 @@
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
+ _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
if(delete())
{
+ _log.info("QEI delete message:" + getMessage().getMessageId());
getMessage().decrementReference(storeContext);
}
+ else
+ {
+ _log.info("QEI delete state wrong:" + getMessage().getMessageId());
+ }
}
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 20 14:48:28 2009
@@ -918,7 +918,7 @@
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference();
+ return entry.getMessage().incrementReference(1);
}
}
@@ -1418,7 +1418,7 @@
}
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java Fri Feb 20 14:48:28 2009
@@ -32,19 +32,17 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * A deliverable message.
- */
+/** A deliverable message. */
public class TransientAMQMessage implements AMQMessage
{
/** Used for debugging purposes. */
- private static final Logger _log = Logger.getLogger(AMQMessage.class);
+ protected static final Logger _log = Logger.getLogger(AMQMessage.class);
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -58,8 +56,6 @@
protected final Long _messageId;
-
-
/** Flag to indicate that this message requires 'immediate' delivery. */
private static final byte IMMEDIATE = 0x01;
@@ -143,23 +139,23 @@
/**
* Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
* These all need refactoring to some sort of MockAMQMessageFactory.
- */
+ */
@Deprecated
protected TransientAMQMessage(AMQMessage message) throws AMQException
{
_messageId = message.getMessageId();
- _flags = ((TransientAMQMessage)message)._flags;
+ _flags = ((TransientAMQMessage) message)._flags;
_contentHeaderBody = message.getContentHeaderBody();
_messagePublishInfo = message.getMessagePublishInfo();
}
-
/**
* Normal message creation via the MessageFactory uses this constructor
* Package scope limited as MessageFactory should be used
- * @see MessageFactory
*
* @param messageId
+ *
+ * @see MessageFactory
*/
TransientAMQMessage(Long messageId)
{
@@ -191,7 +187,6 @@
return new BodyContentIterator();
}
-
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
@@ -202,32 +197,19 @@
return _messageId;
}
- /**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
- * operation.
- */
- public AMQMessage takeReference()
- {
- incrementReference(); // _referenceCount.incrementAndGet();
-
- return this;
- }
-
- public boolean incrementReference()
- {
- return incrementReference(1);
- }
-
/* Threadsafe. Increment the reference count on the message. */
public boolean incrementReference(int count)
{
- if(_referenceCount.addAndGet(count) <= 1)
+ if (_referenceCount.addAndGet(count) <= 1)
{
- _referenceCount.addAndGet(-count);
+ int newcount = _referenceCount.addAndGet(-count);
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
return false;
}
else
{
+ _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
+ + _referenceCount.get() + ")");
return true;
}
@@ -247,6 +229,8 @@
int count = _referenceCount.decrementAndGet();
+ _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
@@ -256,10 +240,11 @@
// set the reference count way below 0 so that we can detect that the message has been deleted
// this is to guard against the message being spontaneously recreated (from the mgmt console)
// by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE/2);
+ _referenceCount.set(Integer.MIN_VALUE / 2);
try
{
+ _log.debug("Reference Count hit 0, removing message");
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
// no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
@@ -268,7 +253,7 @@
catch (AMQException e)
{
// to maintain consistency, we revert the count
- incrementReference();
+ incrementReference(1);
throw new MessageCleanupException(getMessageId(), e);
}
}
@@ -282,7 +267,6 @@
}
}
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -296,10 +280,10 @@
/**
* Called to enforce the 'immediate' flag.
*
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
@@ -335,7 +319,6 @@
_flags |= DELIVERED_TO_CONSUMER;
}
-
public long getSize()
{
return _contentHeaderBody.bodySize;
@@ -345,7 +328,7 @@
{
return _sessionIdentifier.getSessionInstance();
}
-
+
public Object getPublisherIdentifier()
{
return _sessionIdentifier.getSessionIdentifier();
@@ -356,7 +339,7 @@
_sessionIdentifier = sessionIdentifier;
}
- /** From AMQMessageHandle **/
+ /** From AMQMessageHandle * */
public int getBodyCount()
{
@@ -365,7 +348,7 @@
public ContentChunk getContentChunk(int index)
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
throw new RuntimeException("No ContentBody has been set");
}
@@ -381,9 +364,9 @@
public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException
{
- if(_contentBodies == null)
+ if (_contentBodies == null)
{
- if(isLastContentBody)
+ if (isLastContentBody)
{
_contentBodies = Collections.singletonList(contentChunk);
}
@@ -411,9 +394,10 @@
/**
* This is called when all the content has been received.
+ *
* @param storeContext
- *@param messagePublishInfo
- * @param contentHeaderBody @throws AMQException
+ * @param messagePublishInfo
+ * @param contentHeaderBody @throws AMQException
*/
public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
ContentHeaderBody contentHeaderBody)
@@ -425,7 +409,7 @@
throw new NullPointerException("HeaderBody cannot be null");
}
- if( messagePublishInfo == null)
+ if (messagePublishInfo == null)
{
throw new NullPointerException("PublishInfo cannot be null");
}
@@ -433,15 +417,14 @@
_messagePublishInfo = messagePublishInfo;
_contentHeaderBody = contentHeaderBody;
-
- if( contentHeaderBody.bodySize == 0)
+ if (contentHeaderBody.bodySize == 0)
{
_contentBodies = Collections.EMPTY_LIST;
- }
+ }
_arrivalTime = System.currentTimeMillis();
- if(messagePublishInfo.isImmediate())
+ if (messagePublishInfo.isImmediate())
{
_flags |= IMMEDIATE;
}
@@ -457,7 +440,6 @@
//no-op
}
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Feb 20 14:48:28 2009
@@ -1357,7 +1357,7 @@
if(message != null)
{
- message.incrementReference();
+ message.incrementReference(1);
}
else
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Feb 20 14:48:28 2009
@@ -594,6 +594,7 @@
protected void sendToClient(final QueueEntry entry, final long deliveryTag)
throws AMQException
{
+ _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
_deliveryMethod.deliverToClient(this,entry,deliveryTag);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Fri Feb 20 14:48:28 2009
@@ -92,7 +92,7 @@
public void process() throws AMQException
{
- _message.incrementReference();
+ _message.incrementReference(1);
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Feb 20 14:48:28 2009
@@ -238,6 +238,13 @@
" does not.");
}
_transactionLog = (TransactionLog) o;
+
+ //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+ if (_transactionLog instanceof RoutingTable)
+ {
+ _routingTable = (RoutingTable)_transactionLog;
+ }
+
_transactionLog.configure(this, "store", config);
}
@@ -261,9 +268,9 @@
}
else
{
- if (_transactionLog instanceof RoutingTable)
+ if (_routingTable == null)
{
- _routingTable = (RoutingTable)_transactionLog;
+ throw new RuntimeException("No Routing Table configured unable to startup.");
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Feb 20 14:48:28 2009
@@ -242,9 +242,9 @@
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java Fri Feb 20 14:48:28 2009
@@ -29,16 +29,15 @@
public void setUp()
{
_factory = MessageFactory.getInstance();
-
+ _factory.reset();
}
public void test()
{
- AMQMessage message = _factory.createMessage(null, false);
-
- _factory.enableRecover();
- Long messasgeID = message.getMessageId();
+ Long messasgeID = 1L;
+ //Create initial message
+ _factory.createMessage(messasgeID, null);
try
{
@@ -67,7 +66,7 @@
messasgeID += 100;
try
{
- message = _factory.createMessage(messasgeID, null);
+ AMQMessage message = _factory.createMessage(messasgeID, null);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
@@ -76,7 +75,7 @@
}
// End the reovery process.
- _factory.start();
+ _factory.recoveryComplete();
//Check we cannot still create by id after ending recovery phase
try
@@ -96,7 +95,7 @@
try
{
- message = _factory.createMessage(null, false);
+ AMQMessage message = _factory.createMessage(null, false);
assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
}
catch (Exception re)
Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java?rev=746259&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java Fri Feb 20 14:48:28 2009
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageReferenceCountingTest extends TestCase
+{
+ AMQMessage _message;
+
+ public void setUp()
+ {
+ _message = MessageFactory.getInstance().createMessage(null, false);
+ }
+
+ public void testInitialState()
+ {
+
+ assertTrue("New messages should have a reference", _message.isReferenced());
+ }
+
+ public void testIncrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ }
+
+ public void testDecrementReference()
+ {
+ assertTrue("Message should maintain Referenced state", _message.isReferenced());
+ try
+ {
+ _message.decrementReference(null);
+ }
+ catch (MessageCleanupException e)
+ {
+ fail("Decrement should be allowed:"+e.getMessage());
+ }
+
+ assertFalse("Message should not be Referenced state", _message.isReferenced());
+
+ try
+ {
+ _message.decrementReference(null);
+ fail("Decrement should not be allowed as we should have a ref count of 0");
+ }
+ catch (MessageCleanupException e)
+ {
+ assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
+ }
+
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Fri Feb 20 14:48:28 2009
@@ -20,193 +20,22 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-public class MockQueueEntry implements QueueEntry
+public class MockQueueEntry extends QueueEntryImpl
{
+ static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue"));
- private AMQMessage _message;
- private boolean _redelivered;
-
- public boolean acquire()
- {
- return false;
- }
-
- public boolean acquire(Subscription sub)
- {
- return false;
- }
-
- public boolean acquiredBySubscription()
- {
- return false;
- }
-
- public void addStateChangeListener(StateChangeListener listener)
- {
-
- }
-
- public String debugIdentity()
- {
- return null;
- }
-
- public boolean delete()
- {
- return false;
- }
-
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
- {
-
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
- {
-
- }
-
- public void dispose(StoreContext storeContext) throws MessageCleanupException
- {
-
- }
-
- public boolean expired() throws AMQException
- {
- return false;
- }
-
- public Subscription getDeliveredSubscription()
- {
- return null;
- }
-
- public boolean getDeliveredToConsumer()
- {
- return false;
- }
-
- public AMQMessage getMessage()
- {
- return _message;
- }
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public long getSize()
- {
- return 0;
- }
-
- public boolean immediateAndNotDelivered()
- {
- return false;
- }
-
- public boolean isAcquired()
- {
- return false;
- }
-
- public boolean isDeleted()
- {
- return false;
- }
-
-
- public boolean isQueueDeleted()
- {
-
- return false;
- }
-
-
- public boolean isRejectedBy(Subscription subscription)
- {
-
- return false;
- }
-
-
- public void reject()
- {
-
-
- }
-
-
- public void reject(Subscription subscription)
- {
-
-
- }
-
-
- public void release()
- {
-
-
- }
-
-
- public boolean removeStateChangeListener(StateChangeListener listener)
- {
-
- return false;
- }
-
-
- public void requeue(StoreContext storeContext) throws AMQException
- {
-
-
- }
-
-
- public void setDeliveredToSubscription()
- {
-
-
- }
-
-
- public void setRedelivered(boolean redelivered)
- {
- _redelivered = redelivered;
- }
-
-
- public int compareTo(QueueEntry o)
- {
-
- return 0;
- }
-
- public void setMessage(AMQMessage msg)
- {
- _message = msg;
- }
-
- public ContentHeaderBody getContentHeaderBody() throws AMQException
+ public MockQueueEntry()
{
- return _message.getContentHeaderBody();
+ super(_defaultList);
}
- public boolean isPersistent() throws AMQException
+ public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
- return _message.isPersistent();
+ super(queueEntryList, message);
}
- public boolean isRedelivered()
+ public MockQueueEntry(AMQMessage message)
{
- return _redelivered;
+ super(_defaultList, message);
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Feb 20 14:48:28 2009
@@ -296,7 +296,7 @@
public void testGetLastFiveMessageIds() throws Exception
{
AMQMessage message = createMessage();
- Long messageIdOffset = message.getMessageId() -1 ;
+ Long messageIdOffset = message.getMessageId() - 1;
for (int i = 0; i < 10; i++)
{
// Put message on queue
@@ -335,7 +335,6 @@
msg.enqueue(qs);
msg.routingComplete(_store);
-
_store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
@@ -343,14 +342,13 @@
assertNotNull(data);
// Dequeue message
- MockQueueEntry entry = new MockQueueEntry();
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
- entry.setMessage(message);
+ MockQueueEntry entry = new MockQueueEntry(message);
_queue.dequeue(null, entry);
// Check that it is dequeued
@@ -408,9 +406,9 @@
_tag = getMessageId();
}
- public boolean incrementReference()
+ public boolean incrementReference(int count)
{
- _count++;
+ _count+=count;
return true;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Feb 20 14:48:28 2009
@@ -59,7 +59,7 @@
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
@@ -89,10 +89,10 @@
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- message = message.takeReference();
+ message.incrementReference(1);
assertEquals(1, _store.getMessageMetaDataMap().size());
- message = message.takeReference();
+ message.incrementReference(1);
message.decrementReference(_storeContext);
assertEquals(1, _store.getMessageMetaDataMap().size());
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org