You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC
svn commit: r829675 [9/11] - in /qpid/trunk/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/bin/ broker/src/main/java/org/apac...
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Sun Oct 25 22:58:57 2009
@@ -25,7 +25,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
public class AMQQueueFactoryTest extends TestCase
{
@@ -55,31 +54,18 @@
FieldTable fieldTable = new FieldTable();
fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, fieldTable);
-
- assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, fieldTable);
+
+ assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
}
public void testSimpleQueueRegistration()
{
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, null);
+ assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Sun Oct 25 22:58:57 2009
@@ -29,7 +29,10 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,19 +41,15 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -61,8 +60,6 @@
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private MessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
private AMQProtocolSession _protocolSession;
private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
@@ -108,7 +105,7 @@
//Ensure that the data has been removed from the Store
verifyBrokerState();
}
-
+
public void testDeleteMessages() throws Exception
{
int messageCount = 10;
@@ -129,9 +126,9 @@
}
catch(Exception e)
{
-
+
}
-
+
//delete last message, leaving 2nd to 9th
_queueMBean.deleteMessages(10L,10L);
assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
@@ -143,7 +140,7 @@
}
catch(Exception e)
{
-
+
}
//delete remaining messages, leaving none
@@ -159,18 +156,16 @@
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+ assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
}
public void testConsumerCount() throws AMQException
{
-
+
assertTrue(_queue.getActiveConsumerCount() == 0);
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
@@ -182,7 +177,7 @@
Subscription subscription =
SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
-
+
_queue.registerSubscription(subscription, false);
assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
@@ -225,7 +220,6 @@
assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
assertFalse(_queueMBean.isAutoDelete());
assertFalse(_queueMBean.isDurable());
}
@@ -261,7 +255,7 @@
{
}
-
+
try
{
long end = Integer.MAX_VALUE;
@@ -275,17 +269,22 @@
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageId();
- _queue.clearQueue(_storeContext);
+ _queue.clearQueue();
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
+ long id = msg.getMessageNumber();
msg.addContentBodyFrame(new ContentChunk()
{
ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+ {
+ _data.limit((int)MESSAGE_SIZE);
+ }
+
public int getSize()
{
return (int) MESSAGE_SIZE;
@@ -301,7 +300,12 @@
}
});
- msg.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(msg.getStoredMessage());
+ for(AMQQueue q : msg.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
@@ -350,7 +354,7 @@
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -360,15 +364,17 @@
protected void setUp() throws Exception
{
super.setUp();
- IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ IApplicationRegistry applicationRegistry = new TestApplicationRegistry(new ServerConfiguration(configuration));
+ ApplicationRegistry.initialise(applicationRegistry );
+
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
- _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
-
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
null);
_queueMBean = new AMQQueueMBean(_queue);
@@ -391,7 +397,8 @@
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
@@ -400,7 +407,12 @@
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- currentMessage.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
+ for(AMQQueue q : currentMessage.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Sun Oct 25 22:58:57 2009
@@ -28,7 +28,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +42,9 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.Set;
-import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -62,8 +59,6 @@
private TestMemoryMessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
-
private AMQChannel _channel;
private AMQQueue _queue;
@@ -99,11 +94,7 @@
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>()
- );
_queue.registerSubscription(_subscription,false);
- MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -136,31 +127,50 @@
return new AMQShortString("rk");
}
};
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ final IncomingMessage msg = new IncomingMessage(publishBody);
//IncomingMessage msg2 = null;
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+
if (persistent)
{
- BasicContentHeaderProperties b = new BasicContentHeaderProperties();
//This is DeliveryMode.PERSISTENT
b.setDeliveryMode((byte) 2);
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
- msg.setContentHeaderBody(cb);
- }
- else
- {
- msg.setContentHeaderBody(new ContentHeaderBody());
}
+
+ msg.setContentHeaderBody(cb);
+
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, factory);
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
if(msg.allContentReceived())
{
- msg.deliverToQueues();
+ ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, msg, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +188,7 @@
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ assertEquals("",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -191,8 +200,6 @@
assertTrue(unackedMsg.getQueue() == _queue);
}
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
@@ -207,8 +214,8 @@
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
@@ -224,8 +231,8 @@
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Sun Oct 25 22:58:57 2009
@@ -20,25 +20,18 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
public class MockAMQMessage extends AMQMessage
{
public MockAMQMessage(long messageId)
throws AMQException
{
- super(new MockAMQMessageHandle(messageId) ,
- (StoreContext)null,
- (MessagePublishInfo)new MockMessagePublishInfo());
+ super(new MockStoredMessage(messageId));
}
- protected MockAMQMessage(AMQMessage msg)
- throws AMQException
- {
- super(msg);
- }
+
@Override
@@ -46,4 +39,5 @@
{
return 0l;
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Sun Oct 25 22:58:57 2009
@@ -23,23 +23,19 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
import java.util.List;
import java.util.Set;
import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
public class MockAMQQueue implements AMQQueue
{
@@ -47,6 +43,10 @@
private AMQShortString _name;
private VirtualHost _virtualhost;
+ private PrincipalHolder _principalHolder;
+
+ private Object _exclusiveOwner;
+
public MockAMQQueue(String name)
{
_name = new AMQShortString(name);
@@ -57,6 +57,11 @@
return _name;
}
+ public void setNoLocal(boolean b)
+ {
+
+ }
+
public boolean isDurable()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -162,17 +167,22 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ public void requeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+ public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -211,23 +221,23 @@
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
-
+
public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
{
return null;
}
- public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -286,16 +296,16 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ public void deleteMessageFromTop()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public long clearQueue(StoreContext storeContext) throws AMQException
+ public long clearQueue()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
+
public void checkMessageStatus() throws AMQException
{
@@ -327,8 +337,28 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isExclusive()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Exchange getAlternateExchange()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void checkCapacity(AMQChannel channel)
- {
+ {
}
public ManagedObject getManagedObject()
@@ -343,7 +373,7 @@
public void setMinimumAlertRepeatGap(long value)
{
-
+
}
public long getCapacity()
@@ -368,7 +398,32 @@
public void configure(QueueConfiguration config)
{
-
+
+ }
+
+ public PrincipalHolder getPrincipalHolder()
+ {
+ return _principalHolder;
}
+ public void setPrincipalHolder(PrincipalHolder principalHolder)
+ {
+ _principalHolder = principalHolder;
+ }
+
+ public Object getExclusiveOwner()
+ {
+ return _exclusiveOwner;
+ }
+
+ public void setExclusiveOwner(Object exclusiveOwner)
+ {
+ _exclusiveOwner = exclusiveOwner;
+ }
+
+
+ public String getResourceName()
+ {
+ return _name.toString();
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Sun Oct 25 22:58:57 2009
@@ -21,8 +21,9 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
public class MockQueueEntry implements QueueEntry
{
@@ -44,14 +45,14 @@
return false;
}
- public void addStateChangeListener(StateChangeListener listener)
+ public boolean isAcquiredBy(Subscription subscription)
{
-
+ return false;
}
- public String debugIdentity()
+ public void addStateChangeListener(StateChangeListener listener)
{
- return null;
+
}
public boolean delete()
@@ -59,17 +60,22 @@
return false;
}
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
+ {
+
+ }
+
+ public void discard()
{
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void routeToAlternate()
{
}
- public void dispose(StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
}
@@ -119,70 +125,95 @@
return false;
}
-
+
public boolean isQueueDeleted()
{
return false;
}
-
+
public boolean isRejectedBy(Subscription subscription)
{
return false;
}
-
+
public void reject()
{
}
-
+
public void reject(Subscription subscription)
{
}
-
+
public void release()
{
}
-
+ public boolean releaseButRetain()
+ {
+ return false;
+ }
+
+
public boolean removeStateChangeListener(StateChangeListener listener)
{
return false;
}
-
- public void requeue(StoreContext storeContext) throws AMQException
+
+ public void requeue()
{
}
-
+ public void requeue(Subscription subscription)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
public void setDeliveredToSubscription()
{
}
-
- public void setRedelivered(boolean b)
+
+ public void setRedelivered()
+ {
+
+
+ }
+
+ public AMQMessageHeader getMessageHeader()
{
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public boolean isRedelivered()
+ {
+ return false;
}
-
+
public int compareTo(QueueEntry o)
{
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Sun Oct 25 22:58:57 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.server.queue;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -31,21 +31,21 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
public class SimpleAMQQueueTest extends TestCase
{
@@ -59,7 +59,7 @@
protected DirectExchange _exchange = new DirectExchange();
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
-
+
MessagePublishInfo info = new MessagePublishInfo()
{
@@ -88,7 +88,7 @@
return null;
}
};
-
+
@Override
protected void setUp() throws Exception
{
@@ -97,7 +97,7 @@
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -119,51 +119,51 @@
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing name",
+ assertTrue("Exception was not about missing name",
e.getMessage().contains("name"));
}
-
+
try {
_queue = new SimpleAMQQueue(_qname, false, _owner, false, null);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
- assertTrue("Exception was not about missing vhost",
+ assertTrue("Exception was not about missing vhost",
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
+ _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false,
_virtualHost, _arguments);
assertNotNull("Queue was not created", _queue);
}
-
+
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
-
+
public void testBinding()
{
try
{
_queue.bind(_exchange, _routingKey, null);
- assertTrue("Routing key was not bound",
+ assertTrue("Routing key was not bound",
_exchange.getBindings().containsKey(_routingKey));
- assertEquals("Queue was not bound to key",
+ assertEquals("Queue was not bound to key",
_exchange.getBindings().get(_routingKey).get(0),
_queue);
- assertEquals("Exchange binding count", 1,
+ assertEquals("Exchange binding count", 1,
_queue.getExchangeBindings().size());
- assertEquals("Wrong exchange bound", _routingKey,
+ assertEquals("Wrong exchange bound", _routingKey,
_queue.getExchangeBindings().get(0).getRoutingKey());
- assertEquals("Wrong exchange bound", _exchange,
+ assertEquals("Wrong exchange bound", _exchange,
_queue.getExchangeBindings().get(0).getExchange());
-
+
_queue.unBind(_exchange, _routingKey, null);
- assertFalse("Routing key was still bound",
+ assertFalse("Routing key was still bound",
_exchange.getBindings().containsKey(_routingKey));
- assertNull("Routing key was not empty",
+ assertNull("Routing key was not empty",
_exchange.getBindings().get(_routingKey));
}
catch (AMQException e)
@@ -171,61 +171,61 @@
assertNull("Unexpected exception", e);
}
}
-
+
public void testSubscription() throws AMQException
{
// Check adding a subscription adds it to the queue
_queue.registerSubscription(_subscription, false);
- assertEquals("Subscription did not get queue", _queue,
+ assertEquals("Subscription did not get queue", _queue,
_subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
+ assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
-
+
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+ _queue.enqueue(messageA);
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
assertTrue("Subscription still had queue", _subscription.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
- assertFalse("Queue still has active consumer",
+ assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
-
+
AMQMessage messageB = createMessage(new Long (25));
- _queue.enqueue(null, messageB);
- QueueEntry entry = _subscription.getLastSeenEntry();
- assertNull(entry);
+ _queue.enqueue(messageB);
+ assertNull(_subscription.getQueueContext());
+
}
-
+
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
}
public void testExclusiveConsumer() throws AMQException
{
// Check adding an exclusive subscription adds it to the queue
_queue.registerSubscription(_subscription, true);
- assertEquals("Subscription did not get queue", _queue,
+ assertEquals("Subscription did not get queue", _queue,
_subscription.getQueue());
- assertEquals("Queue does not have consumer", 1,
+ assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
- assertEquals("Queue does not have active consumer", 1,
+ assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
- assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
-
+ _queue.enqueue(messageA);
+ assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+
// Check we cannot add a second subscriber to the queue
Subscription subB = new MockSubscription();
Exception ex = null;
@@ -235,12 +235,12 @@
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
assertTrue(ex instanceof AMQException);
- // Check we cannot add an exclusive subscriber to a queue with an
+ // Check we cannot add an exclusive subscriber to a queue with an
// existing subscription
_queue.unregisterSubscription(_subscription);
_queue.registerSubscription(_subscription, false);
@@ -250,35 +250,35 @@
}
catch (AMQException e)
{
- ex = e;
+ ex = e;
}
assertNotNull(ex);
}
-
- public void testAutoDeleteQueue() throws Exception
+
+ public void testAutoDeleteQueue() throws Exception
{
_queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
+ _queue = new SimpleAMQQueue(_qname, false, null, true, _virtualHost);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
_queue.unregisterSubscription(_subscription);
assertTrue("Queue was not deleted when subscription was removed",
_queue.isDeleted());
}
-
+
public void testResend() throws Exception
{
_queue.registerSubscription(_subscription, false);
Long id = new Long(26);
AMQMessage message = createMessage(id);
- _queue.enqueue(null, message);
- QueueEntry entry = _subscription.getLastSeenEntry();
- entry.setRedelivered(true);
+ _queue.enqueue(message);
+ QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+ entry.setRedelivered();
_queue.resend(entry, _subscription);
-
+
}
-
+
public void testGetFirstMessageId() throws Exception
{
// Create message
@@ -286,7 +286,7 @@
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -302,7 +302,7 @@
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -323,7 +323,7 @@
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -335,7 +335,7 @@
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
-
+
public void testGetMessagesRangeOnTheQueue() throws Exception
{
for (int i = 1 ; i <= 10; i++)
@@ -344,142 +344,138 @@
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
-
+
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
List<QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
-
+
// Check that when 'from' is 0 it is ignored and the range continues from 1
entries = _queue.getMessagesRangeOnTheQueue(0, 2);
assertTrue(entries.size() == 2);
- long msgID = entries.get(0).getMessage().getMessageId();
+ long msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 2L);
// Check that when 'from' is greater than 'to' the returned list is empty
entries = _queue.getMessagesRangeOnTheQueue(5, 4);
assertTrue(entries.size() == 0);
-
- // Get first QueueEntry & check id
+
+ // Get first QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(1, 1);
assertTrue(entries.size() == 1);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
-
+
// Get 5th,6th,7th entries and check id's
entries = _queue.getMessagesRangeOnTheQueue(5, 7);
assertTrue(entries.size() == 3);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 5L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 6L);
- msgID = entries.get(2).getMessage().getMessageId();
+ msgID = entries.get(2).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 7L);
-
+
// Get 10th QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(10, 10);
assertTrue(entries.size() == 1);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
-
+
// Get non-existent 11th QueueEntry & check returned set was empty
entries = _queue.getMessagesRangeOnTheQueue(11, 11);
assertTrue(entries.size() == 0);
-
+
// Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs
entries = _queue.getMessagesRangeOnTheQueue(9, 11);
assertTrue(entries.size() == 2);
- msgID = entries.get(0).getMessage().getMessageId();
+ msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 9L);
- msgID = entries.get(1).getMessage().getMessageId();
+ msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
}
-
+
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
+ final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
- ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
-
+
+ final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
// Send persistent message
+
qs.add(_queue);
- msg.enqueue(qs);
- msg.routingComplete(_store, new MessageHandleFactory());
- _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
-
+ MessageMetaData metaData = msg.headersReceived();
+ StoredMessage handle = _store.addMessage(metaData);
+ msg.setStoredMessage(handle);
+
+
+ ServerTransaction txn = new AutoCommitTransaction(_store);
+
+ txn.enqueue(qs, msg, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ msg.enqueue(qs);
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+
+
+
// Check that it is enqueued
AMQQueue data = _store.getMessages().get(1L);
- assertNotNull(data);
-
+ assertNull(data);
+
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext);
-
+ AMQMessage amqmsg = new AMQMessage(handle);
+
entry.setMessage(amqmsg);
- _queue.dequeue(null, entry);
-
+ _queue.dequeue(entry);
+
// Check that it is dequeued
data = _store.getMessages().get(1L);
assertNull(data);
}
- // FIXME: move this to somewhere useful
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
- {
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
- publishBody,
- new ContentHeaderBody()
- {
- public int getSize()
- {
- return 1;
- }
- });
- }
- catch (AMQException e)
- {
- // won't happen
- }
-
-
- return amqMessageHandle;
- }
-
public class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody), storeContext, publishBody);
+ this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0));
+
+ }
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb)
+ throws AMQException
+ {
+ super(new MockStoredMessage(messageId, publishBody, chb));
_tag = tag;
}
-
public boolean incrementReference()
{
_count++;
return true;
}
- public void decrementReference(StoreContext context)
+ public void decrementReference()
{
_count--;
}
@@ -489,10 +485,10 @@
assertEquals("Wrong count for message with tag " + _tag, expected, _count);
}
}
-
+
protected AMQMessage createMessage(Long id) throws AMQException
{
- AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+ AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Sun Oct 25 22:58:57 2009
@@ -27,8 +27,6 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SimpleAMQQueueThreadPoolTest extends TestCase
{
@@ -47,7 +45,7 @@
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
-
+
queue.stop();
assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
@@ -55,6 +53,6 @@
finally
{
ApplicationRegistry.remove();
- }
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Sun Oct 25 22:58:57 2009
@@ -57,11 +57,11 @@
BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
out.write("<security><queueDenier>notyet</queueDenier><exchangeDenier>yes</exchangeDenier></security>");
out.close();
-
+
_conf = new SecurityConfiguration(new XMLConfiguration(tmpFile));
-
+
// Create ACLManager
-
+
_pluginManager = new MockPluginManager("");
_authzManager = new ACLManager(_conf, _pluginManager);
@@ -79,15 +79,15 @@
// Correctly Close the AR we created
ApplicationRegistry.remove();
super.tearDown();
- }
-
+ }
+
public void testACLManagerConfigurationPluginManager() throws Exception
{
AMQQueue queue = new MockAMQQueue("notyet");
AMQQueue otherQueue = new MockAMQQueue("other");
-
+
assertFalse(_authzManager.authoriseDelete(_session, queue));
-
+
// This should only be denied if the config hasn't been correctly passed in
assertTrue(_authzManager.authoriseDelete(_session, otherQueue));
assertTrue(_authzManager.authorisePurge(_session, queue));
@@ -96,11 +96,11 @@
public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
{
_authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
-
+
Exchange exchange = null;
assertFalse(_authzManager.authoriseDelete(_session, exchange));
}
-
+
public void testConfigurePlugins() throws ConfigurationException
{
Configuration hostConfig = new PropertiesConfiguration();
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java Sun Oct 25 22:58:57 2009
@@ -14,16 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
public class ExchangeDenier extends AllowAll
{
@@ -40,9 +40,9 @@
return new ExchangeDenier();
}
};
-
+
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+ public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
return AuthzResult.DENIED;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java Sun Oct 25 22:58:57 2009
@@ -27,14 +27,13 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl;
-import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl;
import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -43,7 +42,7 @@
private String _user = "user";
private PrincipalPermissions _perms;
-
+
// Common things that are passed to frame constructors
private AMQShortString _queueName = new AMQShortString(this.getClass().getName()+"queue");
private AMQShortString _tempQueueName = new AMQShortString(this.getClass().getName()+"tempqueue");
@@ -65,18 +64,18 @@
private AMQQueue _temporaryQueue;
private Boolean _temporary = false;
private Boolean _ownQueue = false;
-
+
@Override
public void setUp()
{
//Highlight that this test will cause a new AR to be created
- ApplicationRegistry.getInstance();
+ ApplicationRegistry.getInstance();
_perms = new PrincipalPermissions(_user);
- try
+ try
{
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration("test", env));
_exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete);
_queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments);
_temporaryQueue = AMQQueueFactory.createAMQQueueImpl(_tempQueueName, false, _owner , true, _virtualHost, _arguments);
@@ -132,27 +131,29 @@
_perms.grant(Permission.CREATEQUEUE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
}
-
+
// FIXME disabled, this fails due to grant putting the grant into the wrong map QPID-1598
public void disableTestExchangeCreate()
{
- ExchangeDeclareBodyImpl exchangeDeclare =
+ ExchangeDeclareBodyImpl exchangeDeclare =
new ExchangeDeclareBodyImpl(_ticket, _exchangeName, _exchangeType, _passive, _durable,
_autoDelete, _internal, _nowait, _arguments);
Object[] authArgs = new Object[]{exchangeDeclare};
Object[] grantArgs = new Object[]{_exchangeName, _exchangeType};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
_perms.grant(Permission.CREATEEXCHANGE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
}
-
+
public void testConsume()
{
Object[] authArgs = new Object[]{_queue};
Object[] grantArgs = new Object[]{_queueName, _ownQueue};
- assertEquals(AuthzResult.DENIED,_perms.authorise(Permission.CONSUME, authArgs));
+ /* FIXME: This throws a null pointer exception QPID-1599
+ * assertFalse(_perms.authorise(Permission.CONSUME, authArgs));
+ */
_perms.grant(Permission.CONSUME, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CONSUME, authArgs));
}
@@ -166,7 +167,7 @@
_perms.grant(Permission.PUBLISH, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.PUBLISH, authArgs));
}
-
+
public void testVhostAccess()
{
//Tests that granting a user Virtualhost level access allows all authorisation requests
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java Sun Oct 25 22:58:57 2009
@@ -14,21 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.PrincipalHolder;
public class QueueDenier extends AllowAll
{
-
+
public static final ACLPluginFactory FACTORY = new ACLPluginFactory()
{
public boolean supportsTag(String name)
@@ -43,18 +42,18 @@
return plugin;
}
};
-
+
private String _queueName = "";
-
+
@Override
- public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+ public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
if (!(queue.getName().toString().equals(_queueName)))
{
return AuthzResult.ALLOWED;
- }
- else
+ }
+ else
{
return AuthzResult.DENIED;
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Sun Oct 25 22:58:57 2009
@@ -29,17 +29,13 @@
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.ExchangeBinding;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -103,7 +99,7 @@
try
{
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), configuration));
ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
}
catch (Exception e)
@@ -169,7 +165,7 @@
Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
- //Send Message To NonDurable direct Exchange = persistent
+ //Send Message To NonDurable direct Exchange = persistent
sendMessageOnExchange(nonDurableExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(nonDurableExchange, directRouting, false);
@@ -344,22 +340,11 @@
MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
- IncomingMessage currentMessage = null;
+ final IncomingMessage currentMessage;
- try
- {
- currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
- messageInfo,
- new NonTransactionalContext(_virtualHost.getMessageStore(),
- new StoreContext(), null, null),
- new InternalTestProtocolSession(_virtualHost));
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- currentMessage.setMessageStore(_virtualHost.getMessageStore());
+ currentMessage = new IncomingMessage(messageInfo);
+
currentMessage.setExchange(directExchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
@@ -379,35 +364,42 @@
currentMessage.setExpiration();
- try
- {
- currentMessage.route();
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd));
+
+ currentMessage.route();
+
- try
- {
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
- try
- {
- currentMessage.deliverToQueues();
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ // TODO Deliver to queues
+ ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
+ final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues();
+ trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+
+ for(AMQQueue queue : destinationQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
}
}
@@ -496,14 +488,7 @@
fail(e.getMessage());
}
- try
- {
- _virtualHost.getQueueRegistry().registerQueue(queue);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ _virtualHost.getQueueRegistry().registerQueue(queue);
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun Oct 25 22:58:57 2009
@@ -25,14 +25,15 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
/**
* A message store that does nothing. Designed to be used in tests that do not want to use any message store
@@ -45,8 +46,19 @@
public void configure(String base, Configuration config) throws Exception
{
}
-
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -55,7 +67,12 @@
{
}
- public void removeMessage(StoreContext s, Long messageId)
+ public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeMessage(Long messageId)
{
}
@@ -85,24 +102,10 @@
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- }
-
- public void beginTran(StoreContext s) throws AMQException
- {
}
- public boolean inTran(StoreContext sc)
- {
- return false;
- }
- public void commitTran(StoreContext storeContext) throws AMQException
- {
- }
- public void abortTran(StoreContext storeContext) throws AMQException
- {
- }
public List<AMQQueue> createQueues() throws AMQException
{
@@ -114,22 +117,26 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(
+ Long messageId,
+ int index,
+ ContentChunk contentBody,
+ boolean lastContentBody) throws AMQException
{
}
- public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
- public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return null;
}
- public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
{
return null;
}
@@ -139,18 +146,75 @@
return false;
}
- public void removeQueue(final AMQQueue queue) throws AMQException
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
{
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public ServerMessage getMessage(Long messageNumber)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
{
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ public Transaction newTransaction()
+ {
+ return new Transaction()
+ {
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void commitTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,32 +20,79 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
public class TestMemoryMessageStore extends MemoryMessageStore
{
+ private AtomicInteger _messageCount = new AtomicInteger(0);
+
+
public TestMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
{
- return _metaDataMap;
+ return new TestableStoredMessage(super.addMessage(metaData));
}
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ public int getMessageCount()
{
- return _contentBodyMap;
+ return _messageCount.get();
+ }
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Sun Oct 25 22:58:57 2009
@@ -26,9 +26,8 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
@@ -37,13 +36,12 @@
{
private TestMemoryMessageStore _store;
- private StoreContext _storeContext = new StoreContext();
-
protected void setUp() throws Exception
{
super.setUp();
_store = new TestMemoryMessageStore();
+
}
/**
@@ -83,11 +81,12 @@
};
- final long messageId = _store.getNewMessageId();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,info);
+
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
+
+
+ AMQMessage message = new AMQMessage(storedMessage);
message = message.takeReference();
@@ -95,9 +94,9 @@
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
+ message.decrementReference();
+ assertEquals(1, _store.getMessageCount());
}
private ContentHeaderBody createPersistentContentHeader()
@@ -141,25 +140,24 @@
}
};
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb);
- AMQMessage message = new AMQMessage(messageHandle,
- _storeContext,
- info);
-
-
+
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
+
+ AMQMessage message = new AMQMessage(storedMessage);
+
+
message = message.takeReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
message = message.takeReference();
- message.decrementReference(_storeContext);
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ message.decrementReference();
+ assertEquals(1, _store.getMessageCount());
}
public static junit.framework.Test suite()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org