You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/11 17:10:48 UTC
svn commit: r824084 [4/5] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apache/q...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Sun Oct 11 15:10:43 2009
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -30,7 +28,6 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -52,11 +49,11 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -228,17 +225,17 @@
// file and write them in to the new routing Table.
for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
{
- _messageStore.createQueue(cqt.queue, cqt.arguments);
+ getDurableConfigurationStore().createQueue(cqt.queue, cqt.arguments);
}
for (Exchange exchange : configFileRT.exchange)
{
- _messageStore.createExchange(exchange);
+ getDurableConfigurationStore().createExchange(exchange);
}
for (StartupRoutingTable.CreateBindingTuple cbt : configFileRT.bindings)
{
- _messageStore.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
+ getDurableConfigurationStore().bindQueue(cbt.exchange, cbt.routingKey, cbt.queue, cbt.arguments);
}
_authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig);
@@ -352,7 +349,7 @@
if (queue.isDurable())
{
- _messageStore.createQueue(queue);
+ getDurableConfigurationStore().createQueue(queue);
}
String exchangeName = queueConfiguration.getExchange();
@@ -416,6 +413,11 @@
return _messageStore;
}
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return _messageStore;
+ }
+
public AuthenticationManager getAuthenticationManager()
{
return _authenticationManager;
@@ -487,7 +489,7 @@
{
}
- public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+ public void removeMessage(Long messageId) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -553,6 +555,24 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+ {
+ commitTran(context);
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+
+ }
+
public void abortTran(StoreContext context) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -568,22 +588,26 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(
+ Long messageId,
+ int index,
+ ContentChunk contentBody,
+ boolean lastContentBody) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Sun Oct 11 15:10:43 2009
@@ -62,6 +62,6 @@
protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
{
- fromQueue.removeMessagesFromQueue(start, end, _storeContext);
+ fromQueue.removeMessagesFromQueue(start, end);
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Sun Oct 11 15:10:43 2009
@@ -171,7 +171,7 @@
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
-// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber();
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
@@ -182,14 +182,14 @@
// //Print out all the property names
// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
//
-// msg.getMessageId();
+// msg.getMessageNumber();
// msg.getSize();
// msg.getArrivalTime();
// msg.getDeliveredSubscription();
// msg.getDeliveredToConsumer();
// msg.getMessageHandle();
-// msg.getMessageId();
+// msg.getMessageNumber();
// msg.getMessagePublishInfo();
// msg.getPublisher();
@@ -352,7 +352,7 @@
ispersitent.add(msg.isPersistent() ? "true" : "false");
- isredelivered.add(msg.isRedelivered() ? "true" : "false");
+ isredelivered.add(entry.isRedelivered() ? "true" : "false");
isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Sun Oct 11 15:10:43 2009
@@ -22,7 +22,6 @@
import junit.framework.TestCase;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.queue.MockAMQMessage;
@@ -30,15 +29,15 @@
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.AMQException;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.Iterator;
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -63,6 +62,7 @@
UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue = new MockAMQQueue(getName());
+ private MessageStore _messageStore = new MemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
@@ -137,7 +137,7 @@
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -166,7 +166,7 @@
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -187,7 +187,7 @@
// requeueIfUnabletoResend = true so all messages should go to msgToRequeue
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, new StoreContext()));
+ msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -208,7 +208,7 @@
// requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, new StoreContext()));
+ msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -240,7 +240,7 @@
// requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, new StoreContext()));
+ msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Sun Oct 11 15:10:43 2009
@@ -421,7 +421,7 @@
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(0, serverConfig.getMaximumMessageCount());
+ assertEquals(0, serverConfig.getMaximumMessageCount());
// Check value we set
_config.setProperty("maximumMessageCount", 10L);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun Oct 11 15:10:43 2009
@@ -29,13 +29,8 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -54,8 +49,6 @@
*/
private MessageStore _store = new MemoryMessageStore();
- private StoreContext _storeContext = new StoreContext();
-
private MessageHandleFactory _handleFactory = new MessageHandleFactory();
private int count;
@@ -93,14 +86,18 @@
}
- protected void route(Message m) throws AMQException
+ protected int route(Message m) throws AMQException
{
m.route(exchange);
m.getIncomingMessage().routingComplete(_store, _handleFactory);
if(m.getIncomingMessage().allContentReceived())
{
- m.getIncomingMessage().deliverToQueues();
+ for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
+ return m.getIncomingMessage().getDestinationQueues().size();
}
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -120,10 +117,8 @@
protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
{
- try
- {
- route(m);
- assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ int queueCount = route(m);
+
for (TestQueue q : queues)
{
if (expected.contains(q))
@@ -137,12 +132,11 @@
//assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
}
}
- }
- catch (NoRouteException ex)
- {
- assertTrue("Expected "+m+" not to be returned",expectReturn);
- }
+ if(expectReturn)
+ {
+ assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
+ }
}
@@ -244,6 +238,11 @@
{
final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
+ public String toString()
+ {
+ return getName().toString();
+ }
+
public TestQueue(AMQShortString name) throws AMQException
{
super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
@@ -334,6 +333,11 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean releaseButRetain()
+ {
+ return false;
+ }
+
public boolean immediateAndNotDelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -344,6 +348,16 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isRedelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -369,7 +383,7 @@
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -379,12 +393,12 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -394,7 +408,7 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void discard()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -438,10 +452,9 @@
public TestIncomingMessage(final long messageId,
final MessagePublishInfo info,
- final TransactionalContext txnContext,
final AMQProtocolSession publisher)
{
- super(messageId, info, txnContext, publisher);
+ super(messageId, info, publisher);
}
@@ -468,14 +481,6 @@
private static MessageStore _messageStore = new SkeletonMessageStore();
- private static StoreContext _storeContext = new StoreContext();
-
-
- private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
-
Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
this(protocolSession, id, getHeaders(headers));
@@ -496,11 +501,11 @@
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+ super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
- _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
+ _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
_incoming.setContentHeaderBody(header);
@@ -515,14 +520,7 @@
_messageStore,
true);
- try
- {
- amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
- }
- catch (AMQException e)
- {
-
- }
+ amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
return amqMessageHandle;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Sun Oct 11 15:10:43 2009
@@ -25,20 +25,14 @@
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import java.util.LinkedList;
-
public class DestWildExchangeTest extends TestCase
{
@@ -46,7 +40,6 @@
VirtualHost _vhost;
MessageStore _store;
- StoreContext _context;
InternalTestProtocolSession _protocolSession;
@@ -56,7 +49,6 @@
_exchange = new TopicExchange();
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
- _context = new StoreContext();
_protocolSession = new InternalTestProtocolSession(_vhost);
}
@@ -74,7 +66,7 @@
MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
+ IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
message.enqueue(_exchange.route(message));
@@ -89,33 +81,20 @@
IncomingMessage message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -129,52 +108,33 @@
IncomingMessage message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -187,89 +147,56 @@
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has route and should be routed");
- }
+ queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("b");
- try
- {
- routeMessage(message);
- fail("Message has no route and should fail to be routed");
- }
- catch (AMQException nre)
- {
- }
+
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -283,38 +210,24 @@
IncomingMessage message = createMessage("a.c.d.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c.b");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -327,66 +240,39 @@
IncomingMessage message = createMessage("a.c.b.b");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b.c");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
-
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -400,34 +286,21 @@
IncomingMessage message = createMessage("a.c.b.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -440,33 +313,20 @@
IncomingMessage message = createMessage("a.c.b.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
- try
- {
- routeMessage(message);
- }
- catch (AMQException nre)
- {
- fail("Message has no route and should be routed");
- }
+ routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
- queue.deleteMessageFromTop(_context);
+ queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -479,25 +339,24 @@
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
- private void routeMessage(final IncomingMessage message)
+ private int routeMessage(final IncomingMessage message)
throws AMQException
{
message.enqueue(_exchange.route(message));
message.routingComplete(_store, new MessageHandleFactory());
- message.deliverToQueues();
+ AMQMessage msg = new AMQMessage(message.getMessageHandle(), message.getContentHeader(), message.getSize(), message.getMessagePublishInfo());
+ for(AMQQueue q : message.getDestinationQueues())
+ {
+ q.enqueue(msg);
+ }
+ return message.getDestinationQueues().size();
}
public void testMoreRouting() throws AMQException
@@ -508,14 +367,8 @@
IncomingMessage message = createMessage("a.b.c");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
@@ -529,14 +382,8 @@
IncomingMessage message = createMessage("a");
- try
- {
- routeMessage(message);
- fail("Message has route and should not be routed");
- }
- catch (AMQException nre)
- {
- }
+ int queueCount = routeMessage(message);
+ Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
@@ -546,11 +393,7 @@
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
- TransactionalContext trancontext = new NonTransactionalContext(_store, _context, null,
- new LinkedList<RequiredDeliveryException>()
- );
-
- IncomingMessage message = new IncomingMessage(0L, info, trancontext,_protocolSession);
+ IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
message.setContentHeaderBody( new ContentHeaderBody());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Sun Oct 11 15:10:43 2009
@@ -21,17 +21,17 @@
package org.apache.qpid.server.protocol;
import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkDriver;
@@ -68,7 +68,7 @@
public byte getProtocolMajorVersion()
{
return (byte) 8;
- }
+ }
public byte getProtocolMinorVersion()
{
@@ -99,6 +99,15 @@
}
}
+ public void writeReturn(MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody header,
+ Iterator<AMQDataBlock> bodyFrameIterator,
+ int channelId,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ }
// *** ProtocolOutputConverter Implementation
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
@@ -108,7 +117,7 @@
{
}
- public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
{
_deliveryCount.incrementAndGet();
@@ -130,11 +139,11 @@
consumers.put(consumerTag, consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
}
}
- public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
{
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Sun Oct 11 15:10:43 2009
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import javax.management.Notification;
-
import junit.framework.TestCase;
-
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -34,7 +28,6 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -42,13 +35,13 @@
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.Notification;
+import java.util.ArrayList;
+
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
{
@@ -61,11 +54,6 @@
private VirtualHost _virtualHost;
private AMQProtocolEngine _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
/**
@@ -75,6 +63,10 @@
*/
public void testMessageCountAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -82,7 +74,7 @@
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
- sendMessages(MAX_MESSAGE_COUNT, 256l);
+ sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
Notification lastNotification = _queueMBean.getLastNotification();
@@ -99,6 +91,10 @@
*/
public void testMessageSizeAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -106,7 +102,7 @@
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
- sendMessages(1, MAX_MESSAGE_SIZE * 2);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
assertTrue(_queueMBean.getMessageCount() == 1);
Notification lastNotification = _queueMBean.getLastNotification();
@@ -125,6 +121,10 @@
*/
public void testQueueDepthAlertNoSubscriber() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -134,7 +134,7 @@
while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
{
- sendMessages(1, MAX_MESSAGE_SIZE);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE);
}
Notification lastNotification = _queueMBean.getLastNotification();
@@ -154,6 +154,10 @@
*/
public void testMessageAgeAlert() throws Exception
{
+ _protocolSession = new InternalTestProtocolSession(_virtualHost);
+ AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+ _protocolSession.addChannel(channel);
+
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
false, _virtualHost,
null);
@@ -161,7 +165,7 @@
_queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
_queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
- sendMessages(1, MAX_MESSAGE_SIZE);
+ sendMessages(channel, 1, MAX_MESSAGE_SIZE);
// Ensure message sits on queue long enough to age.
Thread.sleep(MAX_MESSAGE_AGE * 2);
@@ -201,7 +205,7 @@
// Send messages(no of message to be little more than what can cause a Queue_Depth alert)
int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
long totalSize = (messageCount * MAX_MESSAGE_SIZE);
- sendMessages(messageCount, MAX_MESSAGE_SIZE);
+ sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
// Check queueDepth. There should be no messages on the queue and as the subscriber is listening
// so there should be no Queue_Deoth alert raised
@@ -281,7 +285,7 @@
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -305,7 +309,7 @@
}
- private void sendMessages(long messageCount, final long size) throws AMQException
+ private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
{
IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
for (int i = 0; i < messages.length; i++)
@@ -339,7 +343,11 @@
}
});
- messages[i].deliverToQueues();
+ _queue.enqueue(new AMQMessage(messages[i].getMessageHandle(),
+ messages[i].getContentHeader(),
+ messages[i].getSize(),
+ messages[i].getMessagePublishInfo()));
+
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Sun Oct 11 15:10:43 2009
@@ -29,7 +29,6 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,10 +37,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
@@ -49,8 +45,6 @@
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -61,8 +55,6 @@
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private MessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
private AMQProtocolSession _protocolSession;
private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
@@ -108,7 +100,7 @@
//Ensure that the data has been removed from the Store
verifyBrokerState();
}
-
+
public void testDeleteMessages() throws Exception
{
int messageCount = 10;
@@ -129,9 +121,9 @@
}
catch(Exception e)
{
-
+
}
-
+
//delete last message, leaving 2nd to 9th
_queueMBean.deleteMessages(10L,10L);
assertTrue(_queueMBean.getMessageCount() == (messageCount - 2));
@@ -143,7 +135,7 @@
}
catch(Exception e)
{
-
+
}
//delete remaining messages, leaving none
@@ -162,7 +154,7 @@
TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
+ assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
@@ -170,7 +162,7 @@
public void testConsumerCount() throws AMQException
{
-
+
assertTrue(_queue.getActiveConsumerCount() == 0);
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
@@ -182,7 +174,7 @@
Subscription subscription =
SUBSCRIPTION_FACTORY.createSubscription(channel.getChannelId(), protocolSession, new AMQShortString("test"), false, null, false, channel.getCreditManager());
-
+
_queue.registerSubscription(subscription, false);
assertEquals(1,(int)_queueMBean.getActiveConsumerCount());
@@ -225,7 +217,6 @@
assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
assertTrue(_queueMBean.getName().equals("testQueue"));
- assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
assertFalse(_queueMBean.isAutoDelete());
assertFalse(_queueMBean.isDurable());
}
@@ -261,7 +252,7 @@
{
}
-
+
try
{
long end = Integer.MAX_VALUE;
@@ -275,13 +266,12 @@
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageId();
- _queue.clearQueue(_storeContext);
+ long id = msg.getMessageNumber();
+ _queue.clearQueue();
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
msg.routingComplete(_messageStore, new MessageHandleFactory());
-
msg.addContentBodyFrame(new ContentChunk()
{
ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
@@ -301,7 +291,12 @@
}
});
- msg.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo());
+ for(AMQQueue q : msg.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
// _queue.process(_storeContext, new QueueEntry(_queue, msg), false);
_queueMBean.viewMessageContent(id);
try
@@ -350,7 +345,7 @@
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -364,11 +359,6 @@
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
- _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>()
- );
-
_queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost,
null);
_queueMBean = new AMQQueueMBean(_queue);
@@ -400,7 +390,12 @@
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- currentMessage.deliverToQueues();
+
+ AMQMessage m = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize(), currentMessage.getMessagePublishInfo());
+ for(AMQQueue q : currentMessage.getDestinationQueues())
+ {
+ q.enqueue(m);
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Sun Oct 11 15:10:43 2009
@@ -28,7 +28,8 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,15 +40,9 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.Set;
-import java.util.Collections;
/**
* Tests that acknowledgements are handled correctly.
@@ -62,8 +57,6 @@
private TestMemoryMessageStore _messageStore;
- private StoreContext _storeContext = new StoreContext();
-
private AMQChannel _channel;
private AMQQueue _queue;
@@ -99,9 +92,6 @@
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
- new LinkedList<RequiredDeliveryException>()
- );
_queue.registerSubscription(_subscription,false);
MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
@@ -136,7 +126,7 @@
return new AMQShortString("rk");
}
};
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession);
+ final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession);
//IncomingMessage msg2 = null;
if (persistent)
{
@@ -160,7 +150,26 @@
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{
- msg.deliverToQueues();
+ Transaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, msg, new Transaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo()));
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
@@ -178,8 +187,7 @@
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ assertEquals("",msgCount,map.size());
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -191,8 +199,6 @@
assertTrue(unackedMsg.getQueue() == _queue);
}
- assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Sun Oct 11 15:10:43 2009
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -30,8 +29,7 @@
throws AMQException
{
super(new MockAMQMessageHandle(messageId) ,
- (StoreContext)null,
- (MessagePublishInfo)new MockMessagePublishInfo());
+ (MessagePublishInfo)new MockMessagePublishInfo());
}
protected MockAMQMessage(AMQMessage msg)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java Sun Oct 11 15:10:43 2009
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.store.StoreContext;
-
public class MockAMQMessageHandle extends InMemoryMessageHandle
{
public MockAMQMessageHandle(final Long messageId)
@@ -30,7 +28,7 @@
}
@Override
- public long getBodySize(StoreContext store)
+ public long getBodySize()
{
return 0l;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Sun Oct 11 15:10:43 2009
@@ -23,25 +23,19 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
import java.util.List;
import java.util.Set;
import java.util.Map;
-import java.util.HashMap;
-import java.util.LinkedList;
public class MockAMQQueue implements AMQQueue
{
@@ -51,6 +45,8 @@
private PrincipalHolder _principalHolder;
+ private Object _exclusiveOwner;
+
public MockAMQQueue(String name)
{
_name = new AMQShortString(name);
@@ -171,7 +167,7 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+ public void requeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -181,7 +177,7 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+ public void dequeue(QueueEntry entry)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -236,7 +232,7 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+ public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -295,12 +291,12 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+ public void deleteMessageFromTop()
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public long clearQueue(StoreContext storeContext) throws AMQException
+ public long clearQueue()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -399,4 +395,15 @@
_principalHolder = principalHolder;
}
+ public Object getExclusiveOwner()
+ {
+ return _exclusiveOwner;
+ }
+
+ public void setExclusiveOwner(Object exclusiveOwner)
+ {
+ _exclusiveOwner = exclusiveOwner;
+ }
+
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Sun Oct 11 15:10:43 2009
@@ -21,8 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.AMQMessageHeader;
public class MockQueueEntry implements QueueEntry
{
@@ -59,17 +59,17 @@
return false;
}
- public void dequeue(StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
{
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void discard()
{
}
- public void dispose(StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
}
@@ -154,7 +154,12 @@
}
-
+ public boolean releaseButRetain()
+ {
+ return false;
+ }
+
+
public boolean removeStateChangeListener(StateChangeListener listener)
{
@@ -162,7 +167,7 @@
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue()
{
@@ -187,6 +192,16 @@
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isRedelivered()
{
return false;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org