You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/05/02 18:49:05 UTC
svn commit: r534541 [1/3] - in /incubator/qpid/trunk/qpid/java: broker/etc/
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/exception/ broker/src/main/j...
Author: rajith
Date: Wed May 2 09:49:03 2007
New Revision: 534541
URL: http://svn.apache.org/viewvc?view=rev&rev=534541
Log:
I am commiting the patch supplied by Arnaud Simon. This patch contains support for dtx.
Currently there is one test case failing. I will try to fix it, if not Arnuad will provide a patch soon
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/EnqueueRecord.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionRecord.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/XAFlag.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/XidImpl.java
Modified:
incubator/qpid/trunk/qpid/java/broker/etc/config.xml
incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml
incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/config.xml Wed May 2 09:49:03 2007
@@ -90,14 +90,18 @@
<virtualhosts>
<virtualhost>
<name>localhost</name>
- <localhost>
- <store>
- <!-- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
- <environment-path>${work}/localhost-store</environment-path> -->
-
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
-
+ <localhost>
+ <store>
+ <environment-path>${work}/localhost-store</environment-path>
+ <!-- <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class> -->
+ <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <!-- <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class> -->
+ <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+ </txn>
+
<security>
<!-- Need protocol changes to allow this-->
<authentication>
@@ -125,9 +129,12 @@
<virtualhost>
<name>development</name>
<development>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+ <store>
+ <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+ </store>
+ <txn>
+ <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+ </txn>
<security>
<name>passwordfile-notusedyet</name>
<mechanism>PLAIN</mechanism>
@@ -139,9 +146,12 @@
<virtualhost>
<name>test</name>
<test>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+ <store>
+ <class>org.apache.qpid.server.messageStore.MemoryMessageStore</class>
+ </store>
+ <txn>
+ <class>org.apache.qpid.server.messageStore.MemoryTransactionManager</class>
+ </txn>
<security>
<name>passwordfile-notusedyet</name>
<mechanism>PLAIN</mechanism>
Modified: incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/persistent_config.xml Wed May 2 09:49:03 2007
@@ -73,12 +73,15 @@
<virtualhosts>
<virtualhost>
<name>localhost</name>
- <localhost>
- <store>
- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <localhost>
+ <store>
<environment-path>${work}/bdbstore/localhost-store</environment-path>
- </store>
-
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
<security>
<access>
<class>org.apache.qpid.server.security.access.PrincipalDatabaseAccessManager</class>
@@ -100,20 +103,28 @@
<virtualhost>
<name>development</name>
<development>
- <store>
- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
- <environment-path>${work}/bdbstore/development-store</environment-path>
- </store>
+ <store>
+ <environment-path>${work}/bdbstore/development-store</environment-path>
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
</development>
</virtualhost>
<virtualhost>
<name>test</name>
<test>
- <store>
- <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
- <environment-path>${work}/bdbstore/test-store</environment-path>
- </store>
+ <store>
+ <environment-path>${work}/bdbstore/test-store</environment-path>
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
</test>
</virtualhost>
Modified: incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/transient_config.xml Wed May 2 09:49:03 2007
@@ -73,9 +73,14 @@
<virtualhost>
<name>localhost</name>
<localhost>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
+
<security>
<access>
@@ -98,18 +103,26 @@
<virtualhost>
<name>development</name>
<development>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
</development>
</virtualhost>
<virtualhost>
<name>test</name>
<test>
- <store>
- <class>org.apache.qpid.server.store.MemoryMessageStore</class>
- </store>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.messageStore.MessageStoreImpl</class>
+ </store>
+ <txn>
+ <environment-tx-timeout>60</environment-tx-timeout>
+ <class>org.apache.qpid.server.store.berkeleydb.txn.TransactionManagerImpl</class>
+ </txn>
</test>
</virtualhost>
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed May 2 09:49:03 2007
@@ -58,8 +58,10 @@
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
/**
* This MBean implements the broker management interface and exposes the
@@ -100,7 +102,6 @@
* @param exchangeName
* @param type
* @param durable
- * @param autoDelete
* @throws JMException
*/
public void createNewExchange(String exchangeName, String type, boolean durable) throws JMException
@@ -158,7 +159,6 @@
* @param queueName
* @param durable
* @param owner
- * @param autoDelete
* @throws JMException
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException
@@ -180,7 +180,13 @@
queue = new AMQQueue(new AMQShortString(queueName), durable, ownerShortString, false, getVirtualHost());
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ try
+ {
+ _messageStore.createQueue(queue);
+ } catch (Exception e)
+ {
+ throw new JMException("problem creating queue " + queue.getName());
+ }
}
Configuration virtualHostDefaultQueueConfiguration =
@@ -222,10 +228,9 @@
try
{
queue.delete();
- _messageStore.removeQueue(new AMQShortString(queueName));
-
+ _messageStore.destroyQueue(queue);
}
- catch (AMQException ex)
+ catch (Exception ex)
{
JMException jme = new JMException(ex.getMessage());
jme.initCause(ex);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May 2 09:49:03 2007
@@ -47,11 +47,9 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.Subscription;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.LocalTransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.*;
public class AMQChannel
{
@@ -93,6 +91,8 @@
private final MessageStore _messageStore;
+ private final TransactionManager _transactionManager;
+
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -116,7 +116,8 @@
//Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
- public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+ public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
_session = session;
@@ -125,6 +126,7 @@
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
+ _transactionManager = transactionManager;
_exchanges = exchanges;
// by default the session is non-transactional
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
@@ -133,7 +135,7 @@
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
+ _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
}
public boolean isTransactional()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Wed May 2 09:49:03 2007
@@ -36,8 +36,10 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
public class VirtualHostConfiguration
{
@@ -48,7 +50,9 @@
private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
- public VirtualHostConfiguration(String configFile) throws ConfigurationException
+ public VirtualHostConfiguration(String configFile)
+ throws
+ ConfigurationException
{
_logger.info("Loading Config file:" + configFile);
@@ -57,24 +61,25 @@
}
-
- private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration)
+ throws
+ ConfigurationException,
+ AMQException
{
- _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
+ _logger.debug("Loding configuration for virtaulhost: " + virtualHostName);
VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
-
- if(virtualHost == null)
+ if (virtualHost == null)
{
throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
List exchangeNames = configuration.getList("exchanges.exchange.name");
- for(Object exchangeNameObj : exchangeNames)
+ for (Object exchangeNameObj : exchangeNames)
{
String exchangeName = String.valueOf(exchangeNameObj);
configureExchange(virtualHost, exchangeName, configuration);
@@ -83,7 +88,7 @@
List queueNames = configuration.getList("queues.queue.name");
- for(Object queueNameObj : queueNames)
+ for (Object queueNameObj : queueNames)
{
String queueName = String.valueOf(queueNameObj);
configureQueue(virtualHost, queueName, configuration);
@@ -91,12 +96,14 @@
}
- private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException
+ private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration)
+ throws
+ AMQException
{
CompositeConfiguration exchangeConfiguration = new CompositeConfiguration();
- exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString));
+ exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange." + exchangeNameString));
exchangeConfiguration.addConfiguration(configuration.subset("exchanges"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -110,18 +117,17 @@
Exchange exchange;
-
synchronized (exchangeRegistry)
{
exchange = exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
+ if (exchange == null)
{
- AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct"));
- boolean durable = exchangeConfiguration.getBoolean("durable",false);
- boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false);
+ AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type", "direct"));
+ boolean durable = exchangeConfiguration.getBoolean("durable", false);
+ boolean autodelete = exchangeConfiguration.getBoolean("autodelete", false);
- Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
+ Exchange newExchange = exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
exchangeRegistry.registerExchange(newExchange);
}
@@ -149,11 +155,14 @@
return queueConfiguration;
}
- private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException
+ private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration)
+ throws
+ AMQException,
+ ConfigurationException
{
CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString));
+ queueConfiguration.addConfiguration(configuration.subset("queues.queue." + queueNameString));
queueConfiguration.addConfiguration(configuration.subset("queues"));
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -173,7 +182,7 @@
{
_logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName());
- boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean durable = queueConfiguration.getBoolean("durable", false);
boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
String owner = queueConfiguration.getString("owner", null);
@@ -184,21 +193,32 @@
if (queue.isDurable())
{
- messageStore.createQueue(queue);
+ try
+ {
+ messageStore.createQueue(queue);
+ } catch (InternalErrorException e)
+ {
+ _logger.error("Problem when creating Queue '" + queueNameString
+ + "' on virtual host " + virtualHost.getName() + ", not creating.");
+
+ } catch (QueueAlreadyExistsException e)
+ {
+ _logger.error("Queue '" + queueNameString
+ + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
+ }
}
queueRegistry.registerQueue(queue);
- }
- else
+ } else
{
- _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating.");
+ _logger.info("Queue '" + queueNameString + "' already exists on virtual host " + virtualHost.getName() + ", not creating.");
}
String exchangeName = queueConfiguration.getString("exchange", null);
Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
- if(exchange == null)
+ if (exchange == null)
{
exchange = virtualHost.getExchangeRegistry().getDefaultExchange();
}
@@ -211,15 +231,15 @@
synchronized (exchange)
{
List routingKeys = queueConfiguration.getList("routingKey");
- if(routingKeys == null || routingKeys.isEmpty())
+ if (routingKeys == null || routingKeys.isEmpty())
{
routingKeys = Collections.singletonList(queue.getName());
}
- for(Object routingKeyNameObj : routingKeys)
+ for (Object routingKeyNameObj : routingKeys)
{
AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
-
+
queue.bind(routingKey, null, exchange);
@@ -227,31 +247,33 @@
_logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
}
- if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
+ if (exchange != virtualHost.getExchangeRegistry().getDefaultExchange())
{
- queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
+ queue.bind(queue.getName(), null, virtualHost.getExchangeRegistry().getDefaultExchange());
}
}
}
-
Configurator.configure(queue, queueConfiguration);
}
- public void performBindings() throws AMQException, ConfigurationException
+ public void performBindings()
+ throws
+ AMQException,
+ ConfigurationException
{
List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
String defaultVirtualHostName = _config.getString("default");
- if(defaultVirtualHostName != null)
+ if (defaultVirtualHostName != null)
{
- ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
+ ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName);
}
_logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
- for(Object nameObject : virtualHostNames)
+ for (Object nameObject : virtualHostNames)
{
String name = String.valueOf(nameObject);
configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
@@ -263,7 +285,6 @@
"Virtualhost Configuration document does not contain a valid virtualhost.");
}
}
-
}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/CommandInvalidException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:52:29
+ */
+public class CommandInvalidException extends Exception
+{
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public CommandInvalidException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public CommandInvalidException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new CommandInvalidException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public CommandInvalidException(Throwable cause)
+ {
+ super(cause);
+ }
+
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InternalErrorException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,57 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:41:53
+ */
+public class InternalErrorException extends Exception
+{
+ /**
+ * Constructs a new InternalErrorException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public InternalErrorException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public InternalErrorException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new InternalErrorException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public InternalErrorException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/InvalidXidException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 14:12:27
+ */
+public class InvalidXidException extends Exception
+{
+ /**
+ * Constructs a newr InvalidXidException with a standard message
+ *
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is invalid");
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a cause
+ *
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid", cause);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ */
+ public InvalidXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr InvalidXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is invalid
+ * @param xid The invalid xid.
+ * @param cause The casue for the xid to be invalid
+ */
+ public InvalidXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is invalid, The reason is: " + reason, cause);
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageAlreadyStagedException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 09:46:31
+ */
+public class MessageAlreadyStagedException extends Exception
+{
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageAlreadyStagedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageAlreadyStagedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageAlreadyStagedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageAlreadyStagedException(Throwable cause)
+ {
+ super(cause);
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/MessageDoesntExistException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:52:29
+ */
+public class MessageDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public MessageDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public MessageDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new MessageDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public MessageDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/NotPreparedException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 16:47:40
+ */
+public class NotPreparedException extends Exception
+{
+ /**
+ * Constructs a new NotPreparedException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public NotPreparedException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public NotPreparedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new NotPreparedException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public NotPreparedException(Throwable cause)
+ {
+ super(cause);
+ }
+}
\ No newline at end of file
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueAlreadyExistsException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 30-Mar-2007
+ * Time: 10:49:00
+ */
+public class QueueAlreadyExistsException extends Exception
+{
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueAlreadyExistsException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueAlreadyExistsException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueAlreadyExistsException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueAlreadyExistsException(Throwable cause)
+ {
+ super(cause);
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/QueueDoesntExistException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,58 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:38:24
+ */
+public class QueueDoesntExistException extends Exception
+{
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message.
+ *
+ * @param message the detail message.
+ */
+ public QueueDoesntExistException(String message)
+ {
+ super(message);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified detail message and
+ * cause.
+ *
+ * @param message the detail message .
+ * @param cause the cause.
+ */
+ public QueueDoesntExistException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new QueueDoesntExistException with the specified cause.
+ *
+ * @param cause the cause
+ */
+ public QueueDoesntExistException(Throwable cause)
+ {
+ super(cause);
+ }
+}
\ No newline at end of file
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exception/UnknownXidException.java Wed May 2 09:49:03 2007
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.exception;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 15:45:06
+ */
+public class UnknownXidException extends Exception
+{
+ /**
+ * Constructs a newr UnknownXidException with a standard message
+ *
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid)
+ {
+ super("The Xid: " + xid + " is unknown");
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a cause
+ *
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown", cause);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ */
+ public UnknownXidException(Xid xid, String reason)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason);
+ }
+
+ /**
+ * Constructs a newr UnknownXidException with a reason message and cause
+ *
+ * @param reason The reason why the xid is unknown
+ * @param xid The unknown xid.
+ * @param cause The casue for the xid to be unknown
+ */
+ public UnknownXidException(Xid xid, String reason, Throwable cause)
+ {
+ super("The Xid: " + xid + " is unknown, The reason is: " + reason, cause);
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Wed May 2 09:49:03 2007
@@ -29,7 +29,8 @@
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
@@ -65,7 +66,13 @@
_exchangeMap.put(exchange.getName(), exchange);
if(exchange.isDurable())
{
- getMessageStore().createExchange(exchange);
+ try
+ {
+ getMessageStore().createExchange(exchange);
+ } catch (InternalErrorException e)
+ {
+ throw new AMQException("problem registering excahgne " + exchange, e);
+ }
}
}
@@ -87,7 +94,13 @@
{
if(e.isDurable())
{
- getMessageStore().removeExchange(e);
+ try
+ {
+ getMessageStore().removeExchange(e);
+ } catch (InternalErrorException e1)
+ {
+ throw new AMQException("Problem unregistering Exchange " + name, e1);
+ }
}
e.close();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Wed May 2 09:49:03 2007
@@ -49,7 +49,8 @@
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getTransactionManager(),
+ virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed May 2 09:49:03 2007
@@ -42,9 +42,11 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueAlreadyExistsException;
import org.apache.commons.configuration.Configuration;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -103,7 +105,13 @@
queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue);
+ try
+ {
+ store.createQueue(queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem when creating queue " + queue, e);
+ }
}
queueRegistry.registerQueue(queue);
if (autoRegister)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=534541&r1=534540&r2=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed May 2 09:49:03 2007
@@ -30,9 +30,11 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.QueueDoesntExistException;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
@@ -107,7 +109,13 @@
if (queue.isDurable())
{
- store.removeQueue(queue.getName());
+ try
+ {
+ store.destroyQueue(queue);
+ } catch (Exception e)
+ {
+ throw new AMQException("problem when destroying queue " + queue, e);
+ }
}
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Wed May 2 09:49:03 2007
@@ -0,0 +1,169 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 26-Apr-2007
+ * Time: 08:23:45
+ */
+public class MemoryMessageStore implements MessageStore
+{
+
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void close()
+ throws
+ InternalErrorException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getNewMessageId()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Wed May 2 09:49:03 2007
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+ /**
+ * Create a new exchange
+ *
+ * @param exchange the exchange to be persisted
+ * @throws InternalErrorException If an error occurs
+ */
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Remove an exchange
+ * @param exchange The exchange to be removed
+ * @throws InternalErrorException If an error occurs
+ */
+ public void removeExchange(Exchange exchange) throws
+ InternalErrorException;
+
+ /**
+ * Bind a queue with an exchange given a routing key
+ *
+ * @param exchange The exchange to bind the queue with
+ * @param routingKey The routing key
+ * @param queue The queue to be bound
+ * @param args Args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void bindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Unbind a queue from an exchange
+ *
+ * @param exchange The exchange the queue was bound to
+ * @param routingKey The routing queue
+ * @param queue The queue to unbind
+ * @param args args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param tm The transaction manager implementation
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
+ * @throws IllegalArgumentException If the configuration arguments are illegal
+ */
+ void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws InternalErrorException if close fails
+ */
+ void close()
+ throws
+ InternalErrorException;
+
+ /**
+ * Create a queue
+ *
+ * @param queue the queue to be created
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueAlreadyExistsException If the queue already exists in the store
+ */
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException;
+
+ /**
+ * Destroy a queue
+ *
+ * @param queue The queue to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ */
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException;
+
+ /**
+ * Stage the message before effective enqueue
+ *
+ * @param m The message to stage
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageAlreadyStagedException If the message is already staged
+ */
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException;
+
+
+ /**
+ * Append more data with a previously staged message
+ *
+ * @param m The message to which data must be appended
+ * @param data Data to happen to the message
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be written
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message has not been staged
+ */
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content of previously staged or enqueued message.
+ * The message headers are also set.
+ *
+ * @param m The message for which the content must be loaded
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be loaded
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Destroy a previously staged message
+ *
+ * @param m the message to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist in the store
+ */
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Enqueue a message under the scope of the transaction branch
+ * identified by xid when specified.
+ * <p> This operation is propagated to the queue and the message.
+ * <p> A message that has been previously staged is assumed to have had
+ * its payload already added (see appendContent)
+ *
+ * @param xid The xid of the transaction branch under which the message must be enqueued.
+ * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+ * @param m The message to be enqueued
+ * @param queue The queue into which the message must be enqueued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ * @throws MessageDoesntExistException If the Message does not exist
+ */
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * Dequeue a message under the scope of the transaction branch identified by xid
+ * if specified.
+ * <p> This operation is propagated to the queue and the message.
+ *
+ * @param xid The xid of the transaction branch under which the message must be dequeued.
+ * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+ * @param m The message to be dequeued
+ * @param queue The queue from which the message must be dequeued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ */
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException;
+
+ //=========================================================
+ // Recovery specific methods
+ //=========================================================
+
+ /**
+ * List all the persistent queues
+ *
+ * @return All the persistent queues
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException;
+
+ /**
+ * All enqueued messages of a given queue
+ *
+ * @param queue The queue where the message are retrieved from
+ * @return The list all enqueued messages of a given queue
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException;
+
+ /**
+ * Get a new message ID
+ *
+ * @return A new message ID
+ */
+ public long getNewMessageId();
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableMessage.java Wed May 2 09:49:03 2007
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+/**
+ * A storable message can be persisted in the message store.
+ *
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:56:48
+ */
+public interface StorableMessage
+{
+ /**
+ * The message ID is used by the store to identify a message.
+ *
+ * @return The message identifier
+ */
+ public long getMessageId();
+
+ /**
+ * Get the message header body that is saved when the message is staged.
+ *
+ * @return The message header body
+ */
+ public byte[] getHeaderBody();
+
+ /**
+ * Get the message header body size in bytes.
+ *
+ * @return The message header body size
+ */
+ public int getHeaderSize();
+
+ /**
+ * Get the message payload. This is required when the message is
+ * enqueued without any prior staging.
+ * <p> When the message is staged, the payload can be partial or even empty.
+ *
+ * @return The message payload
+ */
+ public byte[] getData();
+
+ /**
+ * Get the message payload size in bytes.
+ *
+ * @return The message payload size in bytes
+ */
+ public int getPayloadSize();
+
+ /**
+ * Specify whether this message has been enqueued
+ *
+ * @return true if this message is enqueued, false otherwise
+ */
+ public boolean isEnqueued();
+
+ /**
+ * This is called by the message store when this message is enqueued in the message store.
+ *
+ * @param queue The storable queue into which the message is enqueued
+ */
+ public void enqueue(StorableQueue queue);
+
+ /**
+ * This is called by the message store when this message is dequeued.
+ *
+ * @param queue The storable queue out of which the message is dequeued
+ */
+ public void dequeue(StorableQueue queue);
+
+ /**
+ * A message can be enqueued in several queues.
+ * The queue position represents the index of the provided queue within the ordered
+ * list of queues the message has been enqueued.
+ * <p>For example:
+ * <p> If the message is successively enqueued in queue Q1, Q2 and Q3 then
+ * the position of Q1 is 0, position of Q2 is 1 and position of Q3 is 2.
+ * <p> If the message is dequeud form Q2 then position of Q1 is stil 0 but position
+ * of Q3 becomes 1.
+ *
+ * @param queue The storable queue for which the position should be determined
+ *
+ * @return The position of the specified storable queue
+ */
+ public int getQueuePosition(StorableQueue queue);
+
+ /**
+ * Indicates whether this message has been staged.
+ *
+ * @return True if the message has been staged, false otherwise
+ */
+ public boolean isStaged();
+
+ /**
+ * Call by the message store when this message is staged.
+ */
+ public void staged();
+
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java?view=auto&rev=534541
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/StorableQueue.java Wed May 2 09:49:03 2007
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * A storable queue can store storable messages and can be persisted in the store.
+ * Created by Arnaud Simon
+ * Date: 03-Apr-2007
+ * Time: 08:52:18
+ */
+public interface StorableQueue
+{
+ /**
+ * Get This queue unique id.
+ *
+ * @return The queue ID
+ */
+ public int getQueueID();
+
+ /**
+ * Set the queue ID.
+ *
+ * @param id This queue ID
+ */
+ public void setQueueID(int id);
+
+ /**
+ * Get this queue owner.
+ *
+ * @return This queue owner
+ */
+ public AMQShortString getOwner();
+
+ /**
+ * Get this queue name.
+ *
+ * @return the name of this queue
+ */
+ public AMQShortString getName();
+
+ /**
+ * Signifies to this queue that a message is dequeued.
+ * This operation is called by the store.
+ *
+ * @param m The dequeued message
+ */
+ public void dequeue(StorableMessage m);
+
+ /**
+ * Signifies to this queue that a message is enqueued.
+ * This operation is called by the store.
+ *
+ * @param m The enqueued message
+ */
+ public void enqueue(StorableMessage m);
+
+}