You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/04/22 13:15:21 UTC

svn commit: r1470454 [1/3] - in /qpid/trunk/qpid/java/broker/src/main: java/org/apache/qpid/server/store/ java/org/apache/qpid/server/store/derby/ java/org/apache/qpid/server/store/jdbc/ resources/META-INF/services/

Author: rgodfrey
Date: Mon Apr 22 11:15:21 2013
New Revision: 1470454

URL: http://svn.apache.org/r1470454
Log:
QPID-4763 : [Java Broker] Add JDBC store

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java   (contents, props changed)
      - copied, changed from r1447646, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java   (with props)
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java   (with props)
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (from r1447646, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java&r1=1447646&r2=1470454&rev=1470454&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Mon Apr 22 11:15:21 2013
@@ -18,21 +18,17 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.store.derby;
-
+package org.apache.qpid.server.store;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
-import java.sql.Blob;
-import java.sql.CallableStatement;
 import java.sql.Connection;
-import java.sql.Driver;
+import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -55,42 +51,12 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectHelper;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageMetaDataType;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
 
-/**
- * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
- * mechanism.
- *
- * TODO extract the SQL statements into a generic JDBC store
- */
-public class DerbyMessageStore implements MessageStore
+abstract public class AbstractJDBCMessageStore implements MessageStore
 {
-
-    private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
-
-    private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
     private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
 
     private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -108,32 +74,18 @@ public class DerbyMessageStore implement
 
     private static final int DB_VERSION = 6;
 
-
-
-    private static Class<Driver> DRIVER_CLASS;
-    public static final String MEMORY_STORE_LOCATION = ":memory:";
-
     private final AtomicLong _messageId = new AtomicLong(0);
     private AtomicBoolean _closed = new AtomicBoolean(false);
 
-    private String _connectionURL;
+    protected String _connectionURL;
 
-    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
 
-    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
-    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
+    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
 
-    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )";
     private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
     private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
     private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
-
-
-    private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME
-            + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
-    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME
-            + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )";
-
     private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
             + "( message_id, content ) values (?, ?)";
     private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
@@ -141,17 +93,12 @@ public class DerbyMessageStore implement
     private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
             + " WHERE message_id = ?";
 
-    private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
+    private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
     private static final String SELECT_FROM_META_DATA =
             "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
     private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
     private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
 
-    private static final String CREATE_LINKS_TABLE =
-            "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
-                                            + " id_msb bigint not null,"
-                                             + " create_time bigint not null,"
-                                             + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
     private static final String SELECT_FROM_LINKS =
             "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
     private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
@@ -162,15 +109,6 @@ public class DerbyMessageStore implement
                                             + " id_msb = ?";
     private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
                                                   + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
-
-
-    private static final String CREATE_BRIDGES_TABLE =
-            "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
-            + " id_msb bigint not null,"
-            + " create_time bigint not null,"
-            + " link_id_lsb bigint not null,"
-            + " link_id_msb bigint not null,"
-            + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
     private static final String SELECT_FROM_BRIDGES =
             "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
             + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
@@ -189,34 +127,19 @@ public class DerbyMessageStore implement
                                                     + "arguments )"
                                                     + " values (?, ?, ?, ?, ?, ?)";
 
-    private static final String CREATE_XIDS_TABLE =
-            "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
-            + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data,  PRIMARY KEY ( format, " +
-            "global_id, branch_id ))";
     private static final String INSERT_INTO_XIDS =
-            "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
+            "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
     private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
                                                       + " WHERE format = ? and global_id = ? and branch_id = ?";
     private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
-
-
-    private static final String CREATE_XID_ACTIONS_TABLE =
-            "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
-            + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
-            "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" +
-            ",  PRIMARY KEY ( " +
-            "format, global_id, branch_id, action_type, queue_id, message_id))";
     private static final String INSERT_INTO_XID_ACTIONS =
-            "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
+            "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
             "queue_id, message_id ) values (?,?,?,?,?,?) ";
     private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
                                                    + " WHERE format = ? and global_id = ? and branch_id = ?";
     private static final String SELECT_ALL_FROM_XID_ACTIONS =
             "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
             " WHERE format = ? and global_id = ? and branch_id = ?";
-
-    private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
-            + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob,  PRIMARY KEY (id))";
     private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
             + " ( id, object_type, attributes) VALUES (?,?,?)";
     private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
@@ -227,29 +150,17 @@ public class DerbyMessageStore implement
             + " where id = ?";
     private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
 
-    private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
-    private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
-
-    public static final String TYPE = "DERBY";
+    protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
 
-    private final StateManager _stateManager;
+    protected final EventManager _eventManager = new EventManager();
 
-    private final EventManager _eventManager = new EventManager();
-
-    private long _totalStoreSize;
-    private boolean _limitBusted;
-    private long _persistentSizeLowThreshold;
-    private long _persistentSizeHighThreshold;
+    protected final StateManager _stateManager;
 
     private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
     private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
     private ConfigurationRecoveryHandler _configRecoveryHandler;
-    private String _storeLocation;
 
-    public DerbyMessageStore()
+    public AbstractJDBCMessageStore()
     {
         _stateManager = new StateManager(_eventManager);
     }
@@ -297,61 +208,23 @@ public class DerbyMessageStore implement
     private void commonConfiguration(String name, Configuration storeConfiguration)
             throws ClassNotFoundException, SQLException
     {
-        initialiseDriver();
+        implementationSpecificConfiguration(name, storeConfiguration);
+        createOrOpenDatabase();
 
-        //Update to pick up QPID_WORK and use that as the default location not just derbyDB
-
-        final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
-                + File.separator + "derbyDB");
-
-        if(!MEMORY_STORE_LOCATION.equals(databasePath))
-        {
-            File environmentPath = new File(databasePath);
-            if (!environmentPath.exists())
-            {
-                if (!environmentPath.mkdirs())
-                {
-                    throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
-                        + "Ensure the path is correct and that the permissions are correct.");
-                }
-            }
-        }
+    }
 
-        _storeLocation = databasePath;
+    protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException;
 
-        _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
-        _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
-        if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
-        {
-            _persistentSizeLowThreshold = _persistentSizeHighThreshold;
-        }
+    abstract protected Logger getLogger();
 
-        createOrOpenDatabase(name, databasePath);
+    abstract protected String getSqlBlobType();
 
-        Connection conn = newAutoCommitConnection();;
-        try
-        {
-            _totalStoreSize = getSizeOnDisk(conn);
-        }
-        finally
-        {
-            conn.close();
-        }
-    }
+    abstract protected String getSqlVarBinaryType(int size);
 
-    private static synchronized void initialiseDriver() throws ClassNotFoundException
-    {
-        if(DRIVER_CLASS == null)
-        {
-            DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
-        }
-    }
+    abstract protected String getSqlBigIntType();
 
-    private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
+    protected void createOrOpenDatabase() throws SQLException
     {
-        //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
-        _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
-
         Connection conn = newAutoCommitConnection();
 
         createVersionTable(conn);
@@ -366,8 +239,6 @@ public class DerbyMessageStore implement
         conn.close();
     }
 
-
-
     private void createVersionTable(final Connection conn) throws SQLException
     {
         if(!tableExists(DB_VERSION_TABLE_NAME, conn))
@@ -403,7 +274,8 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE);
+                stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+                        + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+",  PRIMARY KEY (id))");
             }
             finally
             {
@@ -419,7 +291,8 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
+                stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
+                        + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
             }
             finally
             {
@@ -436,7 +309,13 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_META_DATA_TABLE);
+                stmt.execute("CREATE TABLE "
+                             + META_DATA_TABLE_NAME
+                             + " ( message_id "
+                             + getSqlBigIntType()
+                             + " not null, meta_data "
+                             + getSqlBlobType()
+                             + ", PRIMARY KEY ( message_id ) )");
             }
             finally
             {
@@ -446,7 +325,6 @@ public class DerbyMessageStore implement
 
     }
 
-
     private void createMessageContentTable(final Connection conn) throws SQLException
     {
         if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
@@ -454,7 +332,13 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-            stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
+                stmt.execute("CREATE TABLE "
+                             + MESSAGE_CONTENT_TABLE_NAME
+                             + " ( message_id "
+                             + getSqlBigIntType()
+                             + " not null, content "
+                             + getSqlBlobType()
+                             + ", PRIMARY KEY (message_id) )");
             }
             finally
             {
@@ -471,7 +355,10 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_LINKS_TABLE);
+                stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+                                                + " id_msb " + getSqlBigIntType() + " not null,"
+                                                 + " create_time " + getSqlBigIntType() + " not null,"
+                                                 + " arguments "+getSqlBlobType()+",  PRIMARY KEY ( id_lsb, id_msb ))");
             }
             finally
             {
@@ -480,7 +367,6 @@ public class DerbyMessageStore implement
         }
     }
 
-
     private void createBridgeTable(final Connection conn) throws SQLException
     {
         if(!tableExists(BRIDGES_TABLE_NAME, conn))
@@ -488,7 +374,12 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_BRIDGES_TABLE);
+                stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+                + " id_msb " + getSqlBigIntType() + " not null,"
+                + " create_time " + getSqlBigIntType() + " not null,"
+                + " link_id_lsb " + getSqlBigIntType() + " not null,"
+                + " link_id_msb " + getSqlBigIntType() + " not null,"
+                + " arguments "+getSqlBlobType()+",  PRIMARY KEY ( id_lsb, id_msb ))");
             }
             finally
             {
@@ -504,7 +395,16 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_XIDS_TABLE);
+                stmt.execute("CREATE TABLE "
+                             + XID_TABLE_NAME
+                             + " ( format " + getSqlBigIntType() + " not null,"
+                             + " global_id "
+                             + getSqlVarBinaryType(64)
+                             + ", branch_id "
+                             + getSqlVarBinaryType(64)
+                             + " ,  PRIMARY KEY ( format, "
+                             +
+                             "global_id, branch_id ))");
             }
             finally
             {
@@ -513,7 +413,6 @@ public class DerbyMessageStore implement
         }
     }
 
-
     private void createXidActionTable(final Connection conn) throws SQLException
     {
         if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
@@ -521,7 +420,12 @@ public class DerbyMessageStore implement
             Statement stmt = conn.createStatement();
             try
             {
-                stmt.execute(CREATE_XID_ACTIONS_TABLE);
+                stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null,"
+                             + " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType(
+                        64) + " not null, " +
+                             "action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" +
+                             ",  PRIMARY KEY ( " +
+                             "format, global_id, branch_id, action_type, queue_id, message_id))");
             }
             finally
             {
@@ -530,29 +434,31 @@ public class DerbyMessageStore implement
         }
     }
 
-    private boolean tableExists(final String tableName, final Connection conn) throws SQLException
+    protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
     {
-        PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
+        DatabaseMetaData metaData = conn.getMetaData();
+        ResultSet rs = metaData.getTables(null, null, "%", null);
+
         try
         {
-            stmt.setString(1, tableName);
-            ResultSet rs = stmt.executeQuery();
-            try
-            {
-                return rs.next();
-            }
-            finally
+
+            while(rs.next())
             {
-                rs.close();
+                final String table = rs.getString(3);
+                if(tableName.equalsIgnoreCase(table))
+                {
+                    return true;
+                }
             }
+            return false;
         }
         finally
         {
-            stmt.close();
+            rs.close();
         }
     }
 
-    private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+    protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
     {
         try
         {
@@ -581,34 +487,20 @@ public class DerbyMessageStore implement
         _closed.getAndSet(true);
         _stateManager.attainState(State.CLOSING);
 
-        try
-        {
-            Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
-            // Shouldn't reach this point - shutdown=true should throw SQLException
-            conn.close();
-            _logger.error("Unable to shut down the store");
-        }
-        catch (SQLException e)
-        {
-            if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
-            {
-                //expected and represents a clean shutdown of this database only, do nothing.
-            }
-            else
-            {
-                _logger.error("Exception whilst shutting down the store: " + e);
-            }
-        }
+        doClose();
 
         _stateManager.attainState(State.CLOSED);
     }
 
+
+    protected abstract void doClose() throws Exception;
+
     @Override
     public StoredMessage addMessage(StorableMessageMetaData metaData)
     {
         if(metaData.isPersistent())
         {
-            return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
+            return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
         }
         else
         {
@@ -637,12 +529,12 @@ public class DerbyMessageStore implement
 
                     if (results == 0)
                     {
-                        _logger.warn("Message metadata not found for message id " + messageId);
+                        getLogger().warn("Message metadata not found for message id " + messageId);
                     }
 
-                    if (_logger.isDebugEnabled())
+                    if (getLogger().isDebugEnabled())
                     {
-                        _logger.debug("Deleted metadata for message " + messageId);
+                        getLogger().debug("Deleted metadata for message " + messageId);
                     }
 
                     stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
@@ -733,7 +625,7 @@ public class DerbyMessageStore implement
     @Override
     public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
     {
-        _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+        getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
 
         if (_stateManager.isInState(State.ACTIVE))
         {
@@ -749,7 +641,7 @@ public class DerbyMessageStore implement
      * NOTE: Currently only updates the exclusivity.
      *
      * @param queue The queue to update the entry for.
-     * @throws AMQStoreException If the operation fails for any reason.
+     * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
      */
     @Override
     public void updateQueue(final AMQQueue queue) throws AMQStoreException
@@ -766,11 +658,12 @@ public class DerbyMessageStore implement
 
     }
 
+
     /**
      * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
      * isolation and with auto-commit transactions enabled.
      */
-    private Connection newAutoCommitConnection() throws SQLException
+    protected Connection newAutoCommitConnection() throws SQLException
     {
         final Connection connection = newConnection();
         try
@@ -797,7 +690,7 @@ public class DerbyMessageStore implement
      * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
      * isolation and with auto-commit transactions disabled.
      */
-    private Connection newConnection() throws SQLException
+    protected Connection newConnection() throws SQLException
     {
         final Connection connection = DriverManager.getConnection(_connectionURL);
         try
@@ -823,7 +716,7 @@ public class DerbyMessageStore implement
     public void removeQueue(final AMQQueue queue) throws AMQStoreException
     {
         AMQShortString name = queue.getNameShortString();
-        _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+        getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
         int results = removeConfiguredObject(queue.getId());
         if (results == 0)
         {
@@ -863,12 +756,10 @@ public class DerbyMessageStore implement
         return argumentBytes;
     }
 
-
-
     @Override
     public Transaction newTransaction()
     {
-        return new DerbyTransaction();
+        return new JDBCTransaction();
     }
 
     public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
@@ -878,9 +769,18 @@ public class DerbyMessageStore implement
 
         try
         {
-            if (_logger.isDebugEnabled())
+            if (getLogger().isDebugEnabled())
             {
-                _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]");
+                getLogger().debug("Enqueuing message "
+                                   + messageId
+                                   + " on queue "
+                                   + (queue instanceof AMQQueue
+                                      ? ((AMQQueue) queue).getName()
+                                      : "")
+                                   + queue.getId()
+                                   + "[Connection"
+                                   + conn
+                                   + "]");
             }
 
             PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
@@ -894,10 +794,11 @@ public class DerbyMessageStore implement
             {
                 stmt.close();
             }
+
         }
         catch (SQLException e)
         {
-            _logger.error("Failed to enqueue: " + e.getMessage(), e);
+            getLogger().error("Failed to enqueue: " + e.getMessage(), e);
             throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
                 + " to database", e);
         }
@@ -916,7 +817,7 @@ public class DerbyMessageStore implement
             try
             {
                 stmt.setString(1, queue.getId().toString());
-                stmt.setLong(2,messageId);
+                stmt.setLong(2, messageId);
                 int results = stmt.executeUpdate();
 
 
@@ -927,27 +828,29 @@ public class DerbyMessageStore implement
                            + " with id " + queue.getId());
                 }
 
-                if (_logger.isDebugEnabled())
+                if (getLogger().isDebugEnabled())
                 {
-                    _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
-                            + " with id " + queue.getId());
+                    getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
+                                                                                          ? ((AMQQueue) queue).getName()
+                                                                                          : "")
+                                       + " with id " + queue.getId());
                 }
             }
             finally
             {
                 stmt.close();
             }
+
         }
         catch (SQLException e)
         {
-            _logger.error("Failed to dequeue: " + e.getMessage(), e);
+            getLogger().error("Failed to dequeue: " + e.getMessage(), e);
             throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
                     + " with id " + queue.getId() + " from database", e);
         }
 
     }
 
-
     private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
             throws AMQStoreException
     {
@@ -993,13 +896,12 @@ public class DerbyMessageStore implement
         }
         catch (SQLException e)
         {
-            _logger.error("Failed to dequeue: " + e.getMessage(), e);
+            getLogger().error("Failed to dequeue: " + e.getMessage(), e);
             throw new AMQStoreException("Error deleting enqueued message with xid", e);
         }
 
     }
 
-
     private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
                            Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
     {
@@ -1061,7 +963,7 @@ public class DerbyMessageStore implement
         }
         catch (SQLException e)
         {
-            _logger.error("Failed to enqueue: " + e.getMessage(), e);
+            getLogger().error("Failed to enqueue: " + e.getMessage(), e);
             throw new AMQStoreException("Error writing xid ", e);
         }
 
@@ -1091,9 +993,9 @@ public class DerbyMessageStore implement
             Connection conn = connWrapper.getConnection();
             conn.commit();
 
-            if (_logger.isDebugEnabled())
+            if (getLogger().isDebugEnabled())
             {
-                _logger.debug("commit tran completed");
+                getLogger().debug("commit tran completed");
             }
 
             conn.close();
@@ -1121,9 +1023,9 @@ public class DerbyMessageStore implement
             throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
         }
 
-        if (_logger.isDebugEnabled())
+        if (getLogger().isDebugEnabled())
         {
-            _logger.debug("abort tran called: " + connWrapper.getConnection());
+            getLogger().debug("abort tran called: " + connWrapper.getConnection());
         }
 
         try
@@ -1144,13 +1046,12 @@ public class DerbyMessageStore implement
         return _messageId.incrementAndGet();
     }
 
-
     private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
         throws SQLException
     {
-        if(_logger.isDebugEnabled())
+        if(getLogger().isDebugEnabled())
         {
-            _logger.debug("Adding metadata for message " +messageId);
+            getLogger().debug("Adding metadata for message " + messageId);
         }
 
         PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
@@ -1161,7 +1062,7 @@ public class DerbyMessageStore implement
             final int bodySize = 1 + metaData.getStorableSize();
             byte[] underlying = new byte[bodySize];
             underlying[0] = (byte) metaData.getType().ordinal();
-            java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+            ByteBuffer buf = ByteBuffer.wrap(underlying);
             buf.position(1);
             buf = buf.slice();
 
@@ -1198,10 +1099,7 @@ public class DerbyMessageStore implement
 
     }
 
-
-
-
-    private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+    protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
         try
@@ -1221,20 +1119,19 @@ public class DerbyMessageStore implement
                     {
 
                         long messageId = rs.getLong(1);
-                        Blob dataAsBlob = rs.getBlob(2);
-
                         if(messageId > maxId)
                         {
                             maxId = messageId;
                         }
 
-                        byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-                        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+                        byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+
+                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
                         buf.position(1);
                         buf = buf.slice();
                         MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
                         StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
-                        StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
+                        StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
                         messageHandler.message(message);
                     }
 
@@ -1259,8 +1156,7 @@ public class DerbyMessageStore implement
     }
 
 
-
-    private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+    protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
         try
@@ -1378,7 +1274,7 @@ public class DerbyMessageStore implement
         }
     }
 
-    private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+    protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
         try
@@ -1481,10 +1377,8 @@ public class DerbyMessageStore implement
 
                     if(rs.next())
                     {
-                        Blob dataAsBlob = rs.getBlob(1);
-
-                        byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-                        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+                        byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+                        ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
                         buf.position(1);
                         buf = buf.slice();
                         MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
@@ -1513,12 +1407,13 @@ public class DerbyMessageStore implement
         }
     }
 
+    protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
 
     private void addContent(Connection conn, long messageId, ByteBuffer src)
     {
-        if(_logger.isDebugEnabled())
+        if(getLogger().isDebugEnabled())
         {
-            _logger.debug("Adding content for message " +messageId);
+            getLogger().debug("Adding content for message " + messageId);
         }
         PreparedStatement stmt = null;
 
@@ -1548,7 +1443,6 @@ public class DerbyMessageStore implement
 
     }
 
-
     public int getContent(long messageId, int offset, ByteBuffer dst)
     {
         Connection conn = null;
@@ -1567,10 +1461,8 @@ public class DerbyMessageStore implement
             if (rs.next())
             {
 
-                Blob dataAsBlob = rs.getBlob(1);
-
-                final int size = (int) dataAsBlob.length();
-                byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+                byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+                int size = dataAsBytes.length;
 
                 if (offset > size)
                 {
@@ -1611,13 +1503,13 @@ public class DerbyMessageStore implement
     }
 
 
-    private class DerbyTransaction implements Transaction
+    protected class JDBCTransaction implements Transaction
     {
         private final ConnectionWrapper _connWrapper;
         private int _storeSizeIncrease;
 
 
-        private DerbyTransaction()
+        protected JDBCTransaction()
         {
             try
             {
@@ -1633,11 +1525,11 @@ public class DerbyMessageStore implement
         public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
             final StoredMessage storedMessage = message.getStoredMessage();
-            if(storedMessage instanceof StoredDerbyMessage)
+            if(storedMessage instanceof StoredJDBCMessage)
             {
                 try
                 {
-                    ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
+                    ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
                 }
                 catch (SQLException e)
                 {
@@ -1645,27 +1537,27 @@ public class DerbyMessageStore implement
                 }
             }
             _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
-            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+            AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
         }
 
         @Override
         public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+            AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
 
         }
 
         @Override
         public void commitTran() throws AMQStoreException
         {
-            DerbyMessageStore.this.commitTran(_connWrapper);
+            AbstractJDBCMessageStore.this.commitTran(_connWrapper);
             storedSizeChange(_storeSizeIncrease);
         }
 
         @Override
         public StoreFuture commitTranAsync() throws AMQStoreException
         {
-            final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+            final StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
             storedSizeChange(_storeSizeIncrease);
             return storeFuture;
         }
@@ -1673,26 +1565,24 @@ public class DerbyMessageStore implement
         @Override
         public void abortTran() throws AMQStoreException
         {
-            DerbyMessageStore.this.abortTran(_connWrapper);
+            AbstractJDBCMessageStore.this.abortTran(_connWrapper);
         }
 
         @Override
         public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
         {
-            DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+            AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
         }
 
         @Override
         public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
                 throws AMQStoreException
         {
-            DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+            AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
         }
     }
 
-
-
-    private class StoredDerbyMessage implements StoredMessage
+    private class StoredJDBCMessage implements StoredMessage
     {
 
         private final long _messageId;
@@ -1704,14 +1594,14 @@ public class DerbyMessageStore implement
         private volatile SoftReference<byte[]> _dataRef;
 
 
-        StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
+        StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
         {
             this(messageId, metaData, false);
         }
 
 
-        StoredDerbyMessage(long messageId,
-                           StorableMessageMetaData metaData, boolean isRecovered)
+        StoredJDBCMessage(long messageId,
+                          StorableMessageMetaData metaData, boolean isRecovered)
         {
             _messageId = messageId;
             _isRecovered = isRecovered;
@@ -1731,7 +1621,7 @@ public class DerbyMessageStore implement
             {
                 try
                 {
-                    metaData = DerbyMessageStore.this.getMetaData(_messageId);
+                    metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
                 }
                 catch (SQLException e)
                 {
@@ -1750,7 +1640,7 @@ public class DerbyMessageStore implement
         }
 
         @Override
-        public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+        public void addContent(int offsetInMessage, ByteBuffer src)
         {
             src = src.slice();
 
@@ -1773,7 +1663,7 @@ public class DerbyMessageStore implement
         }
 
         @Override
-        public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+        public int getContent(int offsetInMessage, ByteBuffer dst)
         {
             byte[] data = _dataRef == null ? null : _dataRef.get();
             if(data != null)
@@ -1784,7 +1674,7 @@ public class DerbyMessageStore implement
             }
             else
             {
-                return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+                return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
             }
         }
 
@@ -1817,9 +1707,9 @@ public class DerbyMessageStore implement
             }
             catch (SQLException e)
             {
-                if(_logger.isDebugEnabled())
+                if(getLogger().isDebugEnabled())
                 {
-                    _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
+                    getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
                 }
                 throw new RuntimeException(e);
             }
@@ -1834,7 +1724,7 @@ public class DerbyMessageStore implement
         public void remove()
         {
             int delta = getMetaData().getContentSize();
-            DerbyMessageStore.this.removeMessage(_messageId);
+            AbstractJDBCMessageStore.this.removeMessage(_messageId);
             storedSizeChange(-delta);
         }
 
@@ -1845,7 +1735,7 @@ public class DerbyMessageStore implement
                 try
                 {
                     storeMetaData(conn, _messageId, _metaData);
-                    DerbyMessageStore.this.addContent(conn, _messageId,
+                    AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                       _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
                 }
                 finally
@@ -1854,9 +1744,9 @@ public class DerbyMessageStore implement
                     _data = null;
                 }
 
-                if(_logger.isDebugEnabled())
+                if(getLogger().isDebugEnabled())
                 {
-                    _logger.debug("Storing message " + _messageId + " to store");
+                    getLogger().debug("Storing message " + _messageId + " to store");
                 }
             }
         }
@@ -1867,7 +1757,7 @@ public class DerbyMessageStore implement
         }
     }
 
-    private void closeConnection(final Connection conn)
+    protected void closeConnection(final Connection conn)
     {
         if(conn != null)
         {
@@ -1877,12 +1767,12 @@ public class DerbyMessageStore implement
            }
            catch (SQLException e)
            {
-               _logger.error("Problem closing connection", e);
+               getLogger().error("Problem closing connection", e);
            }
         }
     }
 
-    private void closePreparedStatement(final PreparedStatement stmt)
+    protected void closePreparedStatement(final PreparedStatement stmt)
     {
         if (stmt != null)
         {
@@ -1892,7 +1782,7 @@ public class DerbyMessageStore implement
             }
             catch(SQLException e)
             {
-                _logger.error("Problem closing prepared statement", e);
+                getLogger().error("Problem closing prepared statement", e);
             }
         }
     }
@@ -1903,12 +1793,6 @@ public class DerbyMessageStore implement
         _eventManager.addEventListener(eventListener, events);
     }
 
-    @Override
-    public String getStoreLocation()
-    {
-        return _storeLocation;
-    }
-
     private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
     {
         if (_stateManager.isInState(State.ACTIVE))
@@ -2085,12 +1969,7 @@ public class DerbyMessageStore implement
                         if (rs.next())
                         {
                             String type = rs.getString(1);
-                            Blob blob = rs.getBlob(2);
-                            String attributes = null;
-                            if (blob != null)
-                            {
-                                attributes = blobToString(blob);
-                            }
+                            String attributes = getBlobAsString(rs, 2);
                             result = new ConfiguredObjectRecord(id, type, attributes);
                         }
                     }
@@ -2117,12 +1996,6 @@ public class DerbyMessageStore implement
         return result;
     }
 
-    private String blobToString(Blob blob) throws SQLException
-    {
-        byte[] bytes = blob.getBytes(1, (int)blob.length());
-        return new String(bytes, UTF8_CHARSET);
-    }
-
     private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
     {
         ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
@@ -2139,7 +2012,7 @@ public class DerbyMessageStore implement
                     {
                         String id = rs.getString(1);
                         String objectType = rs.getString(2);
-                        String attributes = blobToString(rs.getBlob(3));
+                        String attributes = getBlobAsString(rs, 3);
                         results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
                     }
                 }
@@ -2160,180 +2033,8 @@ public class DerbyMessageStore implement
         return results;
     }
 
-    private synchronized void storedSizeChange(final int delta)
-    {
-        if(getPersistentSizeHighThreshold() > 0)
-        {
-            synchronized(this)
-            {
-                // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
-                // time, so we do so only when there's been enough change that it is worth looking again. We do this by
-                // assuming the total size will change by less than twice the amount of the message data change.
-                long newSize = _totalStoreSize += 3*delta;
-
-                Connection conn = null;
-                try
-                {
-
-                    if(!_limitBusted &&  newSize > getPersistentSizeHighThreshold())
-                    {
-                        conn = newAutoCommitConnection();
-                        _totalStoreSize = getSizeOnDisk(conn);
-                        if(_totalStoreSize > getPersistentSizeHighThreshold())
-                        {
-                            _limitBusted = true;
-                            _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
-                        }
-                    }
-                    else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
-                    {
-                        long oldSize = _totalStoreSize;
-                        conn = newAutoCommitConnection();
-                        _totalStoreSize = getSizeOnDisk(conn);
-                        if(oldSize <= _totalStoreSize)
-                        {
-
-                            reduceSizeOnDisk(conn);
-
-                            _totalStoreSize = getSizeOnDisk(conn);
-                        }
-
-                        if(_totalStoreSize < getPersistentSizeLowThreshold())
-                        {
-                            _limitBusted = false;
-                            _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
-                        }
-
-
-                    }
-                }
-                catch (SQLException e)
-                {
-                    closeConnection(conn);
-                    throw new RuntimeException("Exception will processing store size change", e);
-                }
-            }
-        }
-    }
-
-    private void reduceSizeOnDisk(Connection conn)
-    {
-        CallableStatement cs = null;
-        PreparedStatement stmt = null;
-        try
-        {
-            String tableQuery =
-                    "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
-            stmt = conn.prepareStatement(tableQuery);
-            ResultSet rs = null;
-
-            List<String> schemas = new ArrayList<String>();
-            List<String> tables = new ArrayList<String>();
-
-            try
-            {
-                rs = stmt.executeQuery();
-                while(rs.next())
-                {
-                    schemas.add(rs.getString(1));
-                    tables.add(rs.getString(2));
-                }
-            }
-            finally
-            {
-                if(rs != null)
-                {
-                    rs.close();
-                }
-            }
-
+    protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
 
-            cs = conn.prepareCall
-                    ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
-
-            for(int i = 0; i < schemas.size(); i++)
-            {
-                cs.setString(1, schemas.get(i));
-                cs.setString(2, tables.get(i));
-                cs.setShort(3, (short) 0);
-                cs.execute();
-            }
-        }
-        catch (SQLException e)
-        {
-            closeConnection(conn);
-            throw new RuntimeException("Error reducing on disk size", e);
-        }
-        finally
-        {
-            closePreparedStatement(stmt);
-            closePreparedStatement(cs);
-        }
-
-    }
-
-    private long getSizeOnDisk(Connection conn)
-    {
-        PreparedStatement stmt = null;
-        try
-        {
-            String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
-                    "    FROM " +
-                    "        SYS.SYSTABLES systabs," +
-                    "        TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
-                    "    WHERE systabs.tabletype = 'T'";
-
-            stmt = conn.prepareStatement(sizeQuery);
-
-            ResultSet rs = null;
-            long size = 0l;
-
-            try
-            {
-                rs = stmt.executeQuery();
-                while(rs.next())
-                {
-                    size = rs.getLong(1);
-                }
-            }
-            finally
-            {
-                if(rs != null)
-                {
-                    rs.close();
-                }
-            }
-
-            return size;
-
-        }
-        catch (SQLException e)
-        {
-            closeConnection(conn);
-            throw new RuntimeException("Error establishing on disk size", e);
-        }
-        finally
-        {
-            closePreparedStatement(stmt);
-        }
-
-    }
-
-
-    private long getPersistentSizeLowThreshold()
-    {
-        return _persistentSizeLowThreshold;
-    }
-
-    private long getPersistentSizeHighThreshold()
-    {
-        return _persistentSizeHighThreshold;
-    }
-
-    @Override
-    public String getStoreType()
-    {
-        return TYPE;
-    }
+    protected abstract void storedSizeChange(int storeSizeIncrease);
 
-}
\ No newline at end of file
+}

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Apr 22 11:15:21 2013
@@ -0,0 +1,6 @@
+/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:886720-886722,887145,892761,930288
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:805429-821809
+/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org