You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC

svn commit: r829675 [7/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Sun Oct 25 22:58:57 2009
@@ -31,10 +31,11 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
 import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.util.NetMatcher;
 
@@ -57,7 +58,7 @@
             return plugin;
         }
     };
-    
+
     public class FirewallRule
     {
 
@@ -69,13 +70,13 @@
         public FirewallRule(String access, List networks, List hostnames)
         {
             _access = (access.equals("allow")) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-            
+
             if (networks != null && networks.size() > 0)
             {
                 String[] networkStrings = objListToStringArray(networks);
                 _network = new NetMatcher(networkStrings);
             }
-            
+
             if (hostnames != null && hostnames.size() > 0)
             {
                 int i = 0;
@@ -85,7 +86,7 @@
                     _hostnamePatterns[i++] = Pattern.compile(hostname);
                 }
             }
-            
+
         }
 
         private String[] objListToStringArray(List objList)
@@ -147,7 +148,7 @@
 
             thread.run();
             long endTime = System.currentTimeMillis() + DNS_TIMEOUT;
-            
+
             while (System.currentTimeMillis() < endTime && !done.get())
             {
                 try
@@ -176,8 +177,15 @@
     private FirewallRule[] _rules;
 
     @Override
-    public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+    public AuthzResult authoriseConnect(PrincipalHolder principalHolder, VirtualHost virtualHost)
     {
+        if(!(principalHolder instanceof ProtocolEngine))
+        {
+            return AuthzResult.ABSTAIN; // We only deal with tcp sessions
+        }
+
+        ProtocolEngine session = (ProtocolEngine) principalHolder;
+
         SocketAddress sockAddr = session.getRemoteAddress();
         if (!(sockAddr instanceof InetSocketAddress))
         {
@@ -228,7 +236,7 @@
             _default = AuthzResult.DENIED;
         }
         CompositeConfiguration finalConfig = new CompositeConfiguration(config);
-        
+
         List subFiles = config.getList("xml[@fileName]");
         for (Object subFile : subFiles)
         {
@@ -236,7 +244,7 @@
         }
 
         // all rules must have an access attribute
-        int numRules = finalConfig.getList("rule[@access]").size(); 
+        int numRules = finalConfig.getList("rule[@access]").size();
         _rules = new FirewallRule[numRules];
         for (int i = 0; i < numRules; i++)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Sun Oct 25 22:58:57 2009
@@ -215,14 +215,7 @@
                 _logger.warn("Unable to load access file:" + jmxaccesssFile);
             }
 
-            try
-            {
-                _mbean.register();
-            }
-            catch (AMQException e)
-            {
-                _logger.warn("Unable to register user management MBean");
-            }
+            _mbean.register();
         }
         catch (JMException e)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java Sun Oct 25 22:58:57 2009
@@ -31,12 +31,12 @@
 {
     protected LogSubject _logSubject;
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception
+    public void configure(VirtualHost virtualHost) throws Exception
     {
         _logSubject = new MessageStoreLogSubject(virtualHost, this);
         CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
     }
-    
+
     public void close() throws Exception
     {
         CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sun Oct 25 22:58:57 2009
@@ -21,28 +21,18 @@
 package org.apache.qpid.server.store;
 
 import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.Configuration;
+
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -56,15 +46,14 @@
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
 
 
-public class DerbyMessageStore extends AbstractMessageStore
+public class DerbyMessageStore implements MessageStore
 {
 
     private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -80,57 +69,66 @@
     private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
     private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
     private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
-    private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
+
+    private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
     private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
 
     private static final int DB_VERSION = 1;
 
 
 
-    private VirtualHost _virtualHost;
     private static Class<Driver> DRIVER_CLASS;
 
-    private final AtomicLong _messageId = new AtomicLong(1);
+    private final AtomicLong _messageId = new AtomicLong(0);
     private AtomicBoolean _closed = new AtomicBoolean(false);
 
     private String _connectionURL;
 
-    Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>();
-
+    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
 
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+
     private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
     private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
     private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
-    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
-    private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
-    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
     private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
     private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
     private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
     private static final String SELECT_FROM_BINDINGS =
-            "SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+            "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
     private static final String FIND_BINDING =
             "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
-    private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
-    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
     private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
     private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
     private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
     private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
     private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
     private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+
+    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
     private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
     private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
-    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
-    private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
-    private static final String SELECT_FROM_MESSAGE_META_DATA =
-            "SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
+
+
+    private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
+    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
+
+    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
     private static final String SELECT_FROM_MESSAGE_CONTENT =
-            "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
-    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
-    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+            "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
+    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+
+    private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
+    private static final String SELECT_FROM_META_DATA =
+            "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+
+
+    private LogSubject _logSubject;
+    private boolean _configured;
 
 
     private enum State
@@ -146,21 +144,82 @@
     private State _state = State.INITIAL;
 
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    public void configureConfigStore(String name,
+                          ConfigurationRecoveryHandler recoveryHandler,
+                          Configuration storeConfiguration,
+                          LogSubject logSubject) throws Exception
     {
-        super.configure(virtualHost,base,config);
-
         stateTransition(State.INITIAL, State.CONFIGURING);
+        _logSubject = logSubject;
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
 
-        initialiseDriver();
+        if(!_configured)
+        {
+            commonConfiguration(name, storeConfiguration, logSubject);
+            _configured = true;
+        }
+
+        // this recovers durable exchanges, queues, and bindings
+        recover(recoveryHandler);
 
-        _virtualHost = virtualHost;
 
-        _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
-        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        stateTransition(State.RECOVERING, State.STARTED);
+
+    }
+
+
+    public void configureMessageStore(String name,
+                          MessageStoreRecoveryHandler recoveryHandler,
+                          Configuration storeConfiguration,
+                          LogSubject logSubject) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
+
+        if(!_configured)
+        {
+
+            _logSubject = logSubject;
+
+            commonConfiguration(name, storeConfiguration, logSubject);
+            _configured = true;
+        }
+
+        recoverMessages(recoveryHandler);
+
+    }
+
+
+
+    public void configureTransactionLog(String name,
+                          TransactionLogRecoveryHandler recoveryHandler,
+                          Configuration storeConfiguration,
+                          LogSubject logSubject) throws Exception
+    {
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.TXN_1001(this.getClass().getName()));
+
+        if(!_configured)
+        {
+
+            _logSubject = logSubject;
+
+            commonConfiguration(name, storeConfiguration, logSubject);
+            _configured = true;
+        }
+
+        recoverQueueEntries(recoveryHandler);
+
+    }
+
+
+
+    private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
+            throws ClassNotFoundException, SQLException
+    {
+        initialiseDriver();
 
         //Update to pick up QPID_WORK and use that as the default location not just derbyDB
-        final String databasePath = config.getStoreConfiguration().getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
+
+        final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")+"/derbyDB");
 
         File environmentPath = new File(databasePath);
         if (!environmentPath.exists())
@@ -172,17 +231,9 @@
             }
         }
 
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
-
-        createOrOpenDatabase(databasePath);
-
-        // this recovers durable queues and persistent messages
-
-        recover();
-
-
-        stateTransition(State.RECOVERING, State.STARTED);
+        CurrentActor.get().message(logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
 
+        createOrOpenDatabase(name, databasePath);
     }
 
     private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -193,10 +244,10 @@
         }
     }
 
-    private void createOrOpenDatabase(final String environmentPath) throws SQLException
+    private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
     {
         //fixme this the _vhost name should not be added here.
-        _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
+        _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
 
         Connection conn = newConnection();
 
@@ -205,7 +256,7 @@
         createQueueTable(conn);
         createBindingsTable(conn);
         createQueueEntryTable(conn);
-        createMessageMetaDataTable(conn);
+        createMetaDataTable(conn);
         createMessageContentTable(conn);
 
         conn.close();
@@ -276,12 +327,12 @@
 
     }
 
-    private void createMessageMetaDataTable(final Connection conn) throws SQLException
+        private void createMetaDataTable(final Connection conn) throws SQLException
     {
-        if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
+        if(!tableExists(META_DATA_TABLE_NAME, conn))
         {
             Statement stmt = conn.createStatement();
-            stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
+            stmt.execute(CREATE_META_DATA_TABLE);
 
             stmt.close();
         }
@@ -314,38 +365,22 @@
         return exists;
     }
 
-    public void recover() throws AMQException
+    public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
     {
         stateTransition(State.CONFIGURING, State.RECOVERING);
 
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(null, false));
-
-        StoreContext context = new StoreContext();
 
         try
         {
-            Map<AMQShortString, AMQQueue> queues = loadQueues();
-
-            recoverExchanges();
-
-            try
-            {
-
-                beginTran(context);
+            ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
+            List<String> queues = loadQueues(qrh);
 
-                deliverMessages(context, queues);
-                commitTran(context);
+            ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+            List<String> exchanges = loadExchanges(erh);
+            ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+            recoverBindings(brh, exchanges);
+            brh.completeBindingRecovery();
 
-                //Recovery Complete
-                CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(null, false));
-            }
-            finally
-            {
-                if(inTran(context))
-                {
-                    abortTran(context);
-                }
-            }
 
         }
         catch (SQLException e)
@@ -357,53 +392,34 @@
 
     }
 
-    private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
+    private List<String> loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException, AMQException
     {
         Connection conn = newConnection();
 
 
         Statement stmt = conn.createStatement();
         ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
-        Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
+        List<String> queues = new ArrayList<String>();
+
         while(rs.next())
         {
             String queueName = rs.getString(1);
             String owner = rs.getString(2);
-            AMQShortString queueNameShortString = new AMQShortString(queueName);
+            qrh.queue(queueName, owner, null);
 
-            AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
+            queues.add(queueName);
 
-            if (q == null)
-            {
-                q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
-                                                       null);
-                _virtualHost.getQueueRegistry().registerQueue(q);
-            }
-            
-            queueMap.put(queueNameShortString,q);
-            
-            CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(String.valueOf(q.getName()), true));
 
-            //Record that we have a queue for recovery
-            _queueRecoveries.put(new AMQShortString(queueName), 0);
-
-        }
-        return queueMap;
-    }
 
-    private void recoverExchanges() throws AMQException, SQLException
-    {
-        for (Exchange exchange : loadExchanges())
-        {
-            recoverExchange(exchange);
         }
+        return queues;
     }
 
 
-    private List<Exchange> loadExchanges() throws AMQException, SQLException
+    private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws AMQException, SQLException
     {
 
-        List<Exchange> exchanges = new ArrayList<Exchange>();
+        List<String> exchanges = new ArrayList<String>();
         Connection conn = null;
         try
         {
@@ -413,21 +429,15 @@
             Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
 
-            Exchange exchange;
             while(rs.next())
             {
                 String exchangeName = rs.getString(1);
                 String type = rs.getString(2);
                 boolean autoDelete = rs.getShort(3) != 0;
 
-                AMQShortString exchangeNameSS = new AMQShortString(exchangeName);
-                exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
-                if (exchange == null)
-                {
-                    exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
-                    _virtualHost.getExchangeRegistry().registerExchange(exchange);
-                }
-                exchanges.add(exchange);
+                exchanges.add(exchangeName);
+
+                erh.exchange(exchangeName, type, autoDelete);
 
             }
             return exchanges;
@@ -443,11 +453,13 @@
 
     }
 
-    private void recoverExchange(Exchange exchange) throws AMQException, SQLException
+    private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws AMQException, SQLException
     {
-        _logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
 
-        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        _logger.info("Recovering bindings...");
+
+
 
         Connection conn = null;
         try
@@ -455,41 +467,29 @@
             conn = newConnection();
 
             PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
-            stmt.setString(1, exchange.getName().toString());
 
             ResultSet rs = stmt.executeQuery();
 
 
             while(rs.next())
             {
-                String queueName = rs.getString(1);
-                String bindingKey = rs.getString(2);
-                Blob arguments = rs.getBlob(3);
-
+                String exchangeName = rs.getString(1);
+                String queueName = rs.getString(2);
+                String bindingKey = rs.getString(3);
+                Blob arguments = rs.getBlob(4);
+                java.nio.ByteBuffer buf;
 
-                AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
-                if (queue == null)
+                if(arguments != null  && arguments.length() != 0)
                 {
-                    _logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
-                        + exchange.getName());
+                    byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
+                    buf = java.nio.ByteBuffer.wrap(argumentBytes);
                 }
                 else
                 {
-                    _logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
-                        + ", Routing Key: " + bindingKey + ", Arguments: " + arguments
-                        + ")");
-
-                    FieldTable argumentsFT = null;
-                    if(arguments != null)
-                    {
-                        byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
-                        ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
-                        argumentsFT = new FieldTable(buf,arguments.length());
-                    }
-
-                    queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
-
+                    buf = null;
                 }
+
+                brh.binding(exchangeName, queueName, bindingKey, buf);
             }
         }
         finally
@@ -501,44 +501,48 @@
         }
     }
 
+
+
     public void close() throws Exception
     {
+        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
         _closed.getAndSet(true);
-        
-        super.close();        
     }
 
-    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
+    public StoredMessage addMessage(StorableMessageMetaData metaData)
     {
-
-        boolean localTx = getOrCreateTransaction(storeContext);
-
-        Connection conn = getConnection(storeContext);
-        ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
-
-
-        if (_logger.isDebugEnabled())
+        if(metaData.isPersistent())
         {
-            _logger.debug("Message Id: " + messageId + " Removing");
+            return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
         }
+        else
+        {
+            return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+        }
+    }
 
-        // first we need to look up the header to get the chunk count
-        MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+    public StoredMessage getMessage(long messageNumber)
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeMessage(long messageId)
+    {
+        Connection conn = null;
         try
         {
-            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+
+
+            conn = newConnection();
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
             stmt.setLong(1,messageId);
-            wrapper.setRequiresCommit();
             int results = stmt.executeUpdate();
 
             if (results == 0)
             {
-                if (localTx)
-                {
-                    abortTran(storeContext);
-                }
 
-                throw new AMQException("Message metadata not found for message id " + messageId);
+
+                throw new RuntimeException("Message metadata not found for message id " + messageId);
             }
             stmt.close();
 
@@ -551,29 +555,27 @@
             stmt.setLong(1,messageId);
             results = stmt.executeUpdate();
 
-            if(results != mmd.getContentChunkCount())
-            {
-                if (localTx)
-                {
-                    abortTran(storeContext);
-                }
-                throw new AMQException("Unexpected number of content chunks when deleting message.  Expected " + mmd.getContentChunkCount() + " but found " + results);
 
-            }
 
-            if (localTx)
-            {
-                commitTran(storeContext);
-            }
+            conn.commit();
+            conn.close();
         }
         catch (SQLException e)
         {
-            if ((conn != null) && localTx)
+            if ((conn != null))
             {
-                abortTran(storeContext);
+                try
+                {
+                    conn.rollback();
+                    conn.close();
+                }
+                catch (SQLException e1)
+                {
+
+                }
             }
 
-            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+            throw new RuntimeException("Error removing Message with id " + messageId + " to database: " + e, e);
         }
 
     }
@@ -802,8 +804,14 @@
                 {
                     stmt = conn.prepareStatement(INSERT_INTO_QUEUE);
 
+                    String owner = queue.getPrincipalHolder() == null
+                                   ? null
+                                   : queue.getPrincipalHolder().getPrincipal() == null
+                                     ? null
+                                     : queue.getPrincipalHolder().getPrincipal().getName();
+
                     stmt.setString(1, queue.getName().toString());
-                    stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
+                    stmt.setString(2, owner);
 
                     stmt.execute();
 
@@ -873,29 +881,26 @@
 
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public Transaction newTransaction()
     {
-        AMQShortString name = queue.getName();
+        return new DerbyTransaction();
+    }
+
+    public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQException
+    {
+        String name = queue.getResourceName();
+
+        Connection conn = connWrapper.getConnection();
 
-        boolean localTx = getOrCreateTransaction(context);
-        Connection conn = getConnection(context);
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
         try
         {
             PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
-            stmt.setString(1,name.toString());
+            stmt.setString(1,name);
             stmt.setLong(2,messageId);
             stmt.executeUpdate();
             connWrapper.requiresCommit();
 
-            if(localTx)
-            {
-                commitTran(context);
-            }
-
-
-
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
@@ -903,10 +908,6 @@
         }
         catch (SQLException e)
         {
-            if(localTx)
-            {
-                abortTran(context);
-            }
             _logger.error("Failed to enqueue: " + e, e);
             throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
                 + " to database", e);
@@ -914,18 +915,18 @@
 
     }
 
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource  queue, Long messageId) throws AMQException
     {
-        AMQShortString name = queue.getName();
+        String name = queue.getResourceName();
+
+
+        Connection conn = connWrapper.getConnection();
 
-        boolean localTx = getOrCreateTransaction(context);
-        Connection conn = getConnection(context);
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
         try
         {
             PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
-            stmt.setString(1,name.toString());
+            stmt.setString(1,name);
             stmt.setLong(2,messageId);
             int results = stmt.executeUpdate();
 
@@ -936,13 +937,6 @@
                 throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
             }
 
-            if(localTx)
-            {
-                commitTran(context);
-            }
-
-
-
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Dequeuing message " + messageId + " on queue " + name );//+ "[Connection" + conn + "]");
@@ -950,10 +944,6 @@
         }
         catch (SQLException e)
         {
-            if(localTx)
-            {
-                abortTran(context);
-            }
             _logger.error("Failed to dequeue: " + e, e);
             throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
                 + " from database", e);
@@ -987,51 +977,20 @@
         }
     }
 
-    public void beginTran(StoreContext context) throws AMQException
-    {
-        if (context.getPayload() != null)
-        {
-            throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
-                + context.getPayload());
-        }
-        else
-        {
-            try
-            {
-                Connection conn = newConnection();
-
 
-                context.setPayload(new ConnectionWrapper(conn));
-            }
-            catch (SQLException e)
-            {
-                throw new AMQException("Error starting transaction: " + e, e);
-            }
-        }
-    }
-
-    public void commitTran(StoreContext context) throws AMQException
+    public void commitTran(ConnectionWrapper connWrapper) throws AMQException
     {
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
-
-        if (connWrapper == null)
-        {
-            throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
-        }
 
         try
         {
             Connection conn = connWrapper.getConnection();
-            if(connWrapper.requiresCommit())
-            {
-                conn.commit();
-
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("commit tran completed");
-                }
+            conn.commit();
 
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("commit tran completed");
             }
+
             conn.close();
         }
         catch (SQLException e)
@@ -1040,14 +999,30 @@
         }
         finally
         {
-            context.setPayload(null);
+
         }
     }
 
-    public void abortTran(StoreContext context) throws AMQException
+    public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQException
     {
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
+        commitTran(connWrapper);
+        return new StoreFuture()
+                    {
+                        public boolean isComplete()
+                        {
+                            return true;
+                        }
+
+                        public void waitForCompletion()
+                        {
 
+                        }
+                    };
+
+    }
+
+    public void abortTran(ConnectionWrapper connWrapper) throws AMQException
+    {
         if (connWrapper == null)
         {
             throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
@@ -1072,272 +1047,261 @@
         {
             throw new AMQException("Error aborting transaction: " + e, e);
         }
-        finally
-        {
-            context.setPayload(null);
-        }
+
     }
 
-    public boolean inTran(StoreContext context)
+    public Long getNewMessageId()
     {
-        return context.getPayload() != null;
+        return _messageId.incrementAndGet();
     }
 
-    public Long getNewMessageId()
+
+    private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
+        throws SQLException
     {
-        return _messageId.getAndIncrement();
+        PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
+        stmt.setLong(1,messageId);
+
+        final int bodySize = 1 + metaData.getStorableSize();
+        byte[] underlying = new byte[bodySize];
+        underlying[0] = (byte) metaData.getType().ordinal();
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+        buf.position(1);
+        buf = buf.slice();
+
+        metaData.writeToBuffer(0, buf);
+        ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+        stmt.setBinaryStream(2,bis,underlying.length);
+        stmt.executeUpdate();
+
     }
 
-    public void storeContentBodyChunk(StoreContext context,
-                                      Long messageId,
-                                      int index,
-                                      ContentChunk contentBody,
-                                      boolean lastContentBody) throws AMQException
-    {
-        boolean localTx = getOrCreateTransaction(context);
-        Connection conn = getConnection(context);
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
-        try
-        {
-            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
-            stmt.setLong(1,messageId);
-            stmt.setInt(2, index);
-            byte[] chunkData = new byte[contentBody.getSize()];
-            contentBody.getData().duplicate().get(chunkData);
-            /* this would be the Java 6 way of doing things
-            Blob dataAsBlob = conn.createBlob();
-            dataAsBlob.setBytes(1L, chunkData);
-            stmt.setBlob(3, dataAsBlob);
-            */
-            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
-            stmt.setBinaryStream(3, bis, chunkData.length);
-            stmt.executeUpdate();
-            connWrapper.requiresCommit();
 
-            if(localTx)
-            {
-                commitTran(context);
-            }
-        }
-        catch (SQLException e)
+
+    private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+    {
+        Connection conn = newConnection();
+
+        MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
+
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+
+        long maxId = 0;
+
+        while(rs.next())
         {
-            if(localTx)
+
+            long messageId = rs.getLong(1);
+            Blob dataAsBlob = rs.getBlob(2);
+
+            if(messageId > maxId)
             {
-                abortTran(context);
+                maxId = messageId;
             }
 
-            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+            byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+            java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+            buf.position(1);
+            buf = buf.slice();
+            MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+            StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+            StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+            messageHandler.message(message);
+
+
         }
 
+        _messageId.set(maxId);
+
+        messageHandler.completeMessageRecovery();
     }
 
-    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
-            throws AMQException
-    {
 
-        boolean localTx = getOrCreateTransaction(context);
-        Connection conn = getConnection(context);
-        ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
 
-        try
-        {
+    private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+    {
+        Connection conn = newConnection();
 
-            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
-            stmt.setLong(1,messageId);
-            stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
-            stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
-            stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
-            stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
-
-            ContentHeaderBody headerBody = mmd.getContentHeaderBody();
-            final int bodySize = headerBody.getSize();
-            byte[] underlying = new byte[bodySize];
-            ByteBuffer buf = ByteBuffer.wrap(underlying);
-            headerBody.writePayload(buf);
-/*
-            Blob dataAsBlob = conn.createBlob();
-            dataAsBlob.setBytes(1L, underlying);
-            stmt.setBlob(6, dataAsBlob);
-*/
-            ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
-            stmt.setBinaryStream(6,bis,underlying.length);
+        TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
 
-            stmt.setInt(7, mmd.getContentChunkCount());
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
 
-            stmt.executeUpdate();
-            connWrapper.requiresCommit();
 
-            if(localTx)
-            {
-                commitTran(context);
-            }
-        }
-        catch (SQLException e)
+        while(rs.next())
         {
-            if(localTx)
-            {
-                abortTran(context);
-            }
 
-            throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
+            String queueName = rs.getString(1);
+            long messageId = rs.getLong(2);
+            queueEntryHandler.queueEntry(queueName,messageId);
         }
 
 
+
+        queueEntryHandler.completeQueueEntryRecovery();
+
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    StorableMessageMetaData getMetaData(long messageId) throws SQLException
     {
-        boolean localTx = getOrCreateTransaction(context);
-        Connection conn = getConnection(context);
-
 
+        Connection conn = newConnection();
         try
         {
-
-            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
             stmt.setLong(1,messageId);
             ResultSet rs = stmt.executeQuery();
 
             if(rs.next())
             {
-                final AMQShortString exchange = new AMQShortString(rs.getString(1));
-                final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
-                final boolean mandatory = (rs.getShort(3) != (short)0);
-                final boolean immediate = (rs.getShort(4) != (short)0);
-                MessagePublishInfo info = new MessagePublishInfo()
-                                            {
-
-                                                public AMQShortString getExchange()
-                                                {
-                                                    return exchange;
-                                                }
-
-                                                public void setExchange(AMQShortString exchange)
-                                                {
-
-                                                }
-
-                                                public boolean isImmediate()
-                                                {
-                                                    return immediate;
-                                                }
-
-                                                public boolean isMandatory()
-                                                {
-                                                    return mandatory;
-                                                }
-
-                                                public AMQShortString getRoutingKey()
-                                                {
-                                                    return routingKey;
-                                                }
-                                            }   ;
 
-                Blob dataAsBlob = rs.getBlob(5);
+                Blob dataAsBlob = rs.getBlob(1);
 
                 byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-                ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+                java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+                buf.position(1);
+                buf = buf.slice();
+                MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+                StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
 
-                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
+                return metaData;
+            }
+            else
+            {
+                throw new RuntimeException("Meta data not found for message with id " + messageId);
+            }
+
+        }
+        finally
+        {
+            conn.close();
+        }
+    }
+
+
+    private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
+    {
 
-                if(localTx)
-                {
-                    commitTran(context);
-                }
 
-                return new MessageMetaData(info, chb, rs.getInt(6));
 
+        try
+        {
+            final boolean newConnection = conn == null;
+
+            if(newConnection)
+            {
+                conn = newConnection();
             }
-            else
+
+            src = src.slice();
+
+            byte[] chunkData = new byte[src.limit()];
+            src.duplicate().get(chunkData);
+
+            PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+            stmt.setLong(1,messageId);
+            stmt.setInt(2, offset);
+            stmt.setInt(3, offset+chunkData.length);
+
+
+            /* this would be the Java 6 way of doing things
+            Blob dataAsBlob = conn.createBlob();
+            dataAsBlob.setBytes(1L, chunkData);
+            stmt.setBlob(3, dataAsBlob);
+            */
+            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+            stmt.setBinaryStream(4, bis, chunkData.length);
+            stmt.executeUpdate();
+
+            if(newConnection)
             {
-                if(localTx)
-                {
-                    abortTran(context);
-                }
-                throw new AMQException("Metadata not found for message with id " + messageId);
+                conn.commit();
+                conn.close();
             }
         }
         catch (SQLException e)
         {
-            if(localTx)
+            if(conn != null)
             {
-                abortTran(context);
+                try
+                {
+                    conn.close();
+                }
+                catch (SQLException e1)
+                {
+
+                }
             }
 
-            throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+            throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
         }
 
 
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
-    {
-        boolean localTx = getOrCreateTransaction(context);
-                Connection conn = getConnection(context);
-
 
-                try
-                {
+    public int getContent(long messageId, int offset, ByteBuffer dst)
+    {
+        Connection conn = null;
 
-                    PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
-                    stmt.setLong(1,messageId);
-                    stmt.setInt(2, index);
-                    ResultSet rs = stmt.executeQuery();
 
-                    if(rs.next())
-                    {
-                        Blob dataAsBlob = rs.getBlob(1);
+        try
+        {
+            conn = newConnection();
 
-                        final int size = (int) dataAsBlob.length();
-                        byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
-                        final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+            stmt.setLong(1,messageId);
+            stmt.setInt(2, offset);
+            stmt.setInt(3, offset+dst.remaining());
+            ResultSet rs = stmt.executeQuery();
 
-                        ContentChunk cb = new ContentChunk()
-                        {
+            int written = 0;
 
-                            public int getSize()
-                            {
-                                return size;
-                            }
-
-                            public ByteBuffer getData()
-                            {
-                                return buf;
-                            }
+            while(rs.next())
+            {
+                int offsetInMessage = rs.getInt(1);
+                Blob dataAsBlob = rs.getBlob(2);
 
-                            public void reduceToFit()
-                            {
+                final int size = (int) dataAsBlob.length();
+                byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
 
-                            }
-                        };
+                int posInArray = offset + written - offsetInMessage;
+                int count = size - posInArray;
+                if(count > dst.remaining())
+                {
+                    count = dst.remaining();
+                }
+                dst.put(dataAsBytes,posInArray,count);
+                written+=count;
 
-                        if(localTx)
-                        {
-                            commitTran(context);
-                        }
+                if(dst.remaining() == 0)
+                {
+                    break;
+                }
+            }
 
-                        return cb;
+            conn.close();
+            return written;
 
-                    }
-                    else
-                    {
-                        if(localTx)
-                        {
-                            abortTran(context);
-                        }
-                        throw new AMQException("Message not found for message with id " + messageId);
-                    }
+        }
+        catch (SQLException e)
+        {
+            if(conn != null)
+            {
+                try
+                {
+                    conn.close();
                 }
-                catch (SQLException e)
+                catch (SQLException e1)
                 {
-                    if(localTx)
-                    {
-                        abortTran(context);
-                    }
 
-                    throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
                 }
+            }
+
+            throw new RuntimeException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
+        }
 
 
 
@@ -1348,173 +1312,158 @@
         return true;
     }
 
-    private void checkNotClosed() throws MessageStoreClosedException
-    {
-        if (_closed.get())
-        {
-            throw new MessageStoreClosedException();
-        }
-    }
-
 
-    private static final class ProcessAction
+    private synchronized void stateTransition(State requiredState, State newState) throws AMQException
     {
-        private final AMQQueue _queue;
-        private final StoreContext _context;
-        private final AMQMessage _message;
-
-        public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
-        {
-            _queue = queue;
-            _context = context;
-            _message = message;
-        }
-
-        public void process() throws AMQException
+        if (_state != requiredState)
         {
-            _queue.enqueue(_context, _message);
+            throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+                + "; currently in state: " + _state);
         }
 
+        _state = newState;
     }
 
 
-    private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
-        throws SQLException, AMQException
+    private class DerbyTransaction implements Transaction
     {
-        Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
-        List<ProcessAction> actions = new ArrayList<ProcessAction>();
+        private final ConnectionWrapper _connWrapper;
 
 
-        final boolean inLocaltran = inTran(context);
-        Connection conn = null;
-        try
+        private DerbyTransaction()
         {
-            if(inLocaltran)
+            try
             {
-                conn = getConnection(context);
+                _connWrapper = new ConnectionWrapper(newConnection());
             }
-            else
+            catch (SQLException e)
             {
-                conn = newConnection();
+                throw new RuntimeException(e);
             }
+        }
 
-            MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
-            long maxId = 1;
+        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+        {
+            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+        }
 
-            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
+        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+        {
+            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
 
-            Statement stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+        }
 
-            while (rs.next())
-            {
-                AMQShortString queueName = new AMQShortString(rs.getString(1));
+        public void commitTran() throws AMQException
+        {
+            DerbyMessageStore.this.commitTran(_connWrapper);
+        }
 
-                AMQQueue queue = queues.get(queueName);
-                if (queue == null)
-                {
-                    queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
+        public StoreFuture commitTranAsync() throws AMQException
+        {
+            return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+        }
 
-                    _virtualHost.getQueueRegistry().registerQueue(queue);
-                    queues.put(queueName, queue);
+        public void abortTran() throws AMQException
+        {
+            DerbyMessageStore.this.abortTran(_connWrapper);
+        }
+    }
 
-                    //Log Recovery Start
-                    CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1004(String.valueOf(queue.getName()), true));
-                }
+    private class StoredDerbyMessage implements StoredMessage
+    {
 
-                long messageId = rs.getLong(2);
-                maxId = Math.max(maxId, messageId);
-                AMQMessage message = msgMap.get(messageId);
+        private final long _messageId;
+        private volatile WeakReference<StorableMessageMetaData> _metaDataRef;
+        private Connection _conn;
 
-                if(message != null)
-                {
-                    message.incrementReference();
-                }
-                else
-                {
-                    message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
-                    msgMap.put(messageId,message);
-                }
+        StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
+        {
+            this(messageId, metaData, true);
+        }
 
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
-                }
 
-                Integer count = _queueRecoveries.get(queueName);
-                if (count == null)
+        StoredDerbyMessage(long messageId,
+                           StorableMessageMetaData metaData, boolean persist)
+        {
+            try
+            {
+                _messageId = messageId;
+
+                _metaDataRef = new WeakReference(metaData);
+                if(persist)
                 {
-                    count = 0;
+                    _conn = newConnection();
+                    storeMetaData(_conn, messageId, metaData);
                 }
-
-                _queueRecoveries.put(queueName, ++count);
-
-                actions.add(new ProcessAction(queue, context, message));
             }
-
-            for(ProcessAction action : actions)
+            catch (SQLException e)
             {
-                action.process();
+                throw new RuntimeException(e);
             }
 
-            _messageId.set(maxId + 1);
-        }
-        catch (SQLException e)
-        {
-            _logger.error("Error: " + e, e);
-            throw e;
         }
-        finally
+
+        public StorableMessageMetaData getMetaData()
         {
-            if (inLocaltran && conn != null)
+            StorableMessageMetaData metaData = _metaDataRef.get();
+            if(metaData == null)
             {
-                conn.close();
+                try
+                {
+                    metaData = DerbyMessageStore.this.getMetaData(_messageId);
+                }
+                catch (SQLException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                _metaDataRef = new WeakReference(metaData);
             }
+
+            return metaData;
         }
 
-        if (_logger.isInfoEnabled())
+        public long getMessageNumber()
         {
-            _logger.info("Recovered message counts: " + _queueRecoveries);
+            return _messageId;
         }
 
-        for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
+        public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
         {
-            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1005(entry.getValue(), String.valueOf(entry.getKey())));
-
-            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(String.valueOf(entry.getKey()), true));
+            DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
         }
 
-        // Free the memory
-        _queueRecoveries = null;
-
-    }
-
-    private Connection getConnection(final StoreContext context)
-    {
-        return ((ConnectionWrapper)context.getPayload()).getConnection();
-    }
-
-    private boolean getOrCreateTransaction(StoreContext context) throws AMQException
-    {
-
-        ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
-        if (tx == null)
+        public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
         {
-            beginTran(context);
-            return true;
+            return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
         }
 
-        return false;
-    }
-
-    private synchronized void stateTransition(State requiredState, State newState) throws AMQException
-    {
-        if (_state != requiredState)
+        public StoreFuture flushToStore()
         {
-            throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
-                + "; currently in state: " + _state);
+            try
+            {
+                if(_conn != null)
+                {
+                    _conn.commit();
+                    _conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new RuntimeException(e);
+            }
+            finally
+            {
+                _conn = null;
+            }
+            return IMMEDIATE_FUTURE;
         }
 
-        _state = newState;
+        public void remove()
+        {
+            flushToStore();
+            DerbyMessageStore.this.removeMessage(_messageId);
+        }
     }
+
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,21 +20,20 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.commons.configuration.Configuration;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,9 +42,10 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.nio.ByteBuffer;
 
 /** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore extends AbstractMessageStore
+public class MemoryMessageStore implements MessageStore
 {
     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
 
@@ -53,52 +53,74 @@
 
     private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
 
-    protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
-
-    protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
     private AtomicBoolean _closed = new AtomicBoolean(false);
     private LogSubject _logSubject;
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
     {
-        super.configure(virtualHost,base,config);
+        public void enqueueMessage(TransactionLogResource  queue, Long messageId) throws AMQException
+        {
+        }
 
-        int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
-        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
-        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
-        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
-    }
+        public void dequeueMessage(TransactionLogResource  queue, Long messageId) throws AMQException
+        {
+        }
 
-    public void close() throws Exception
-    {
-        _closed.getAndSet(true);
-        if (_metaDataMap != null)
+        public void commitTran() throws AMQException
         {
-            _metaDataMap.clear();
-            _metaDataMap = null;
         }
-        if (_contentBodyMap != null)
+
+        public StoreFuture commitTranAsync() throws AMQException
+        {
+            return IMMEDIATE_FUTURE;
+        }
+
+        public void abortTran() throws AMQException
         {
-            _contentBodyMap.clear();
-            _contentBodyMap = null;
         }
 
-        super.close();
+    };
+
+    public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
+    {
+        _logSubject = logSubject;
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CFG_1001(this.getClass().getName()));
+
+
     }
 
-    public void removeMessage(StoreContext context, Long messageId) throws AMQException
+    public void configureMessageStore(String name,
+                                      MessageStoreRecoveryHandler recoveryHandler,
+                                      Configuration config,
+                                      LogSubject logSubject) throws Exception
     {
-        checkNotClosed();
-        if (_log.isDebugEnabled())
+        if(_logSubject == null)
         {
-            _log.debug("Removing message with id " + messageId);
+            _logSubject = logSubject;
         }
-        _metaDataMap.remove(messageId);
-        _contentBodyMap.remove(messageId);
+        int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
+        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
+    }
+
+    public void close() throws Exception
+    {
+        _closed.getAndSet(true);
+        CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
+
     }
 
+    public StoredMessage addMessage(StorableMessageMetaData metaData)
+    {
+        final long id = _messageId.getAndIncrement();
+        StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+
+        return message;
+    }
+
+
     public void createExchange(Exchange exchange) throws AMQException
     {
 
@@ -135,35 +157,19 @@
         // Not required to do anything
     }
 
-    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public void configureTransactionLog(String name,
+                                        TransactionLogRecoveryHandler recoveryHandler,
+                                        Configuration storeConfiguration,
+                                        LogSubject logSubject) throws Exception
     {
-        // Not required to do anything
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    public Transaction newTransaction()
     {
-        // Not required to do anything
+        return IN_MEMORY_TRANSACTION;
     }
 
-    public void beginTran(StoreContext context) throws AMQException
-    {
-        // Not required to do anything
-    }
-
-    public void commitTran(StoreContext context) throws AMQException
-    {
-        // Not required to do anything
-    }
-
-    public void abortTran(StoreContext context) throws AMQException
-    {
-        // Not required to do anything
-    }
-
-    public boolean inTran(StoreContext context)
-    {
-        return false;
-    }
 
     public List<AMQQueue> createQueues() throws AMQException
     {
@@ -175,48 +181,6 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
-            throws AMQException
-    {
-        checkNotClosed();
-        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-
-        if (bodyList == null && lastContentBody)
-        {
-            _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
-        }
-        else
-        {
-            if (bodyList == null)
-            {
-                bodyList = new ArrayList<ContentChunk>();
-                _contentBodyMap.put(messageId, bodyList);
-            }
-
-            bodyList.add(index, contentBody);
-        }
-    }
-
-    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
-            throws AMQException
-    {
-        checkNotClosed();
-        _metaDataMap.put(messageId, messageMetaData);
-    }
-
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
-    {
-        checkNotClosed();
-        return _metaDataMap.get(messageId);
-    }
-
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
-    {
-        checkNotClosed();
-        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-        return bodyList.get(index);
-    }
-
     public boolean isPersistent()
     {
         return false;
@@ -229,4 +193,6 @@
             throw new MessageStoreClosedException();
         }
     }
+
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Sun Oct 25 22:58:57 2009
@@ -20,53 +20,43 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.commons.configuration.Configuration;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
 /**
- * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
- * and exchanges in a transactional manner.
- *
- * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
- * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single
- * transaction.
+ * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
  *
- * <p/>The storage and removal of queues and exchanges, are not carried out in a transactional context.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities
- * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort.
- * <tr><td> Store and remove queues.
- * <tr><td> Store and remove exchanges.
- * <tr><td> Store and remove messages.
- * <tr><td> Bind and unbind queues to exchanges.
- * <tr><td> Enqueue and dequeue messages to queues.
- * <tr><td> Generate message identifiers.
- * </table>
  */
-public interface MessageStore
+public interface MessageStore extends DurableConfigurationStore, TransactionLog
 {
+    StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
+        {
+            public boolean isComplete()
+            {
+                return true;
+            }
+
+            public void waitForCompletion()
+            {
+
+            }
+        };
+
+
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.
      *
-     * @param virtualHost The virtual host using by this store
-     * @param base        The base element identifier from which all configuration items are relative. For example, if
-     *                    the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
-     * @param hostConfig      The apache commons configuration object.
+     * @param name             The name to be used by this storem
+     * @param recoveryHandler  Handler to be called as the store recovers on start up
+     * @param config           The apache commons configuration object.
      *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
-    void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception;
+    void configureMessageStore(String name,
+                               MessageStoreRecoveryHandler recoveryHandler,
+                               Configuration config,
+                               LogSubject logSubject) throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.
@@ -75,203 +65,16 @@
      */
     void close() throws Exception;
 
-    /**
-     * Removes the specified message from the store in the given transactional store context.
-     *
-     * @param storeContext The transactional context to remove the message in.
-     * @param messageId    Identifies the message to remove.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
-
-    /**
-     * Makes the specified exchange persistent.
-     *
-     * @param exchange The exchange to persist.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void createExchange(Exchange exchange) throws AMQException;
-
-    /**
-     * Removes the specified persistent exchange.
-     *
-     * @param exchange The exchange to remove.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void removeExchange(Exchange exchange) throws AMQException;
-
-    /**
-     * Binds the specified queue to an exchange with a routing key.
-     *
-     * @param exchange   The exchange to bind to.
-     * @param routingKey The routing key to bind by.
-     * @param queue      The queue to bind.
-     * @param args       Additional parameters.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
-    /**
-     * Unbinds the specified from an exchange under a particular routing key.
-     *
-     * @param exchange   The exchange to unbind from.
-     * @param routingKey The routing key to unbind.
-     * @param queue      The queue to unbind.
-     * @param args       Additonal parameters.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
-    /**
-     * Makes the specified queue persistent.
-     *
-     * @param queue The queue to store.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void createQueue(AMQQueue queue) throws AMQException;
+    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
 
-    /**
-     * Makes the specified queue persistent.
-     *
-     * @param queue The queue to store.
-     *
-     * @param arguments The additional arguments to the binding
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
-
-    /**
-     * Removes the specified queue from the persistent store.
-     *
-     * @param queue The queue to remove.
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void removeQueue(final AMQQueue queue) throws AMQException;
-
-    /**
-     * Places a message onto a specified queue, in a given transactional context.
-     *
-     * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to enqueue.
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
-
-    /**
-     * Extracts a message from a specified queue, in a given transactional context.
-     *
-     * @param context   The transactional context for the operation.
-     * @param queue     The queue to place the message on.
-     * @param messageId The message to dequeue.
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
-
-    /**
-     * Begins a transactional context.
-     *
-     * @param context The transactional context to begin.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void beginTran(StoreContext context) throws AMQException;
-
-    /**
-     * Commits all operations performed within a given transactional context.
-     *
-     * @param context The transactional context to commit all operations for.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void commitTran(StoreContext context) throws AMQException;
-
-    /**
-     * Abandons all operations performed within a given transactional context.
-     *
-     * @param context The transactional context to abandon.
-     *
-     * @throws AMQException If the operation fails for any reason.
-     */
-    void abortTran(StoreContext context) throws AMQException;
-
-    /**
-     * Tests a transactional context to see if it has been begun but not yet committed or aborted.
-     *
-     * @param context The transactional context to test.
-     *
-     * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
-     */
-    boolean inTran(StoreContext context);
-
-    /**
-     * Return a valid, currently unused message id.
-     *
-     * @return A fresh message id.
-     */
-    Long getNewMessageId();
-
-    /**
-     * Stores a chunk of message data.
-     *
-     * @param context         The transactional context for the operation.
-     * @param messageId       The message to store the data for.
-     * @param index           The index of the data chunk.
-     * @param contentBody     The content of the data chunk.
-     * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
-        boolean lastContentBody) throws AMQException;
-
-    /**
-     * Stores message meta-data.
-     *
-     * @param context         The transactional context for the operation.
-     * @param messageId       The message to store the data for.
-     * @param messageMetaData The message meta data to store.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
-
-    /**
-     * Retrieves message meta-data.
-     *
-     * @param context   The transactional context for the operation.
-     * @param messageId The message to get the meta-data for.
-     *
-     * @return The message meta data.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
-
-    /**
-     * Retrieves a chunk of message data.
-     *
-     * @param context   The transactional context for the operation.
-     * @param messageId The message to get the data chunk for.
-     * @param index     The offset index of the data chunk within the message.
-     *
-     * @return A chunk of message data.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
-     */
-    ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
 
     /**
      * Is this store capable of persisting the data
-     * 
+     *
      * @return true if this store is capable of persisting data
      */
     boolean isPersistent();
 
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Sun Oct 25 22:58:57 2009
@@ -35,6 +35,7 @@
     private String _name;
     private Object _payload;
 
+
     public StoreContext()
     {
         _name = "StoreContext";
@@ -68,4 +69,5 @@
     {
         return "<_name = " + _name + ", _payload = " + _payload + ">";
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Oct 25 22:58:57 2009
@@ -46,10 +46,12 @@
     AMQQueue getQueue();
 
     QueueEntry.SubscriptionAcquiredState getOwningState();
+    QueueEntry.SubscriptionAssignedState getAssignedState();
+
 
     void setQueue(AMQQueue queue, boolean exclusive);
 
-    AMQChannel getChannel();
+    void setNoLocal(boolean noLocal);
 
     AMQShortString getConsumerTag();
 
@@ -63,23 +65,24 @@
 
     boolean isClosed();
 
-    boolean isBrowser();
+    boolean acquires();
 
-    void close();
+    boolean seesRequeues();
 
-    boolean filtersMessages();
+    void close();
 
     void send(QueueEntry msg) throws AMQException;
 
-    void queueDeleted(AMQQueue queue); 
+    void queueDeleted(AMQQueue queue);
 
 
     boolean wouldSuspend(QueueEntry msg);
 
     void getSendLock();
+
     void releaseSendLock();
 
-    void resend(final QueueEntry entry) throws AMQException;
+    void onDequeue(final QueueEntry queueEntry);
 
     void restoreCredit(final QueueEntry queueEntry);
 
@@ -87,13 +90,17 @@
 
     public State getState();
 
-    QueueEntry getLastSeenEntry();
+    AMQQueue.Context getQueueContext();
 
-    boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue);
+    void setQueueContext(AMQQueue.Context queueContext);
 
 
     boolean isActive();
 
+    void confirmAutoClose();
+
+    public void set(String key, Object value);
 
+    public Object get(String key);
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org