You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/04/22 13:15:21 UTC
svn commit: r1470454 [1/3] - in /qpid/trunk/qpid/java/broker/src/main:
java/org/apache/qpid/server/store/ java/org/apache/qpid/server/store/derby/
java/org/apache/qpid/server/store/jdbc/ resources/META-INF/services/
Author: rgodfrey
Date: Mon Apr 22 11:15:21 2013
New Revision: 1470454
URL: http://svn.apache.org/r1470454
Log:
QPID-4763 : [Java Broker] Add JDBC store
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (contents, props changed)
- copied, changed from r1447646, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java (with props)
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreFactory.java (with props)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
qpid/trunk/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (from r1447646, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java&r1=1447646&r2=1470454&rev=1470454&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Mon Apr 22 11:15:21 2013
@@ -18,21 +18,17 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.derby;
-
+package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
-import java.sql.Blob;
-import java.sql.CallableStatement;
import java.sql.Connection;
-import java.sql.Driver;
+import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -55,42 +51,12 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfiguredObjectHelper;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.Event;
-import org.apache.qpid.server.store.EventListener;
-import org.apache.qpid.server.store.EventManager;
-import org.apache.qpid.server.store.MessageMetaDataType;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.State;
-import org.apache.qpid.server.store.StateManager;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.store.StoredMemoryMessage;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-/**
- * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
- * mechanism.
- *
- * TODO extract the SQL statements into a generic JDBC store
- */
-public class DerbyMessageStore implements MessageStore
+abstract public class AbstractJDBCMessageStore implements MessageStore
{
-
- private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
-
- private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
-
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -108,32 +74,18 @@ public class DerbyMessageStore implement
private static final int DB_VERSION = 6;
-
-
- private static Class<Driver> DRIVER_CLASS;
- public static final String MEMORY_STORE_LOCATION = ":memory:";
-
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
- private String _connectionURL;
+ protected String _connectionURL;
- private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
- private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
- private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+ private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )";
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
-
-
- private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME
- + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME
- + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )";
-
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ "( message_id, content ) values (?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
@@ -141,17 +93,12 @@ public class DerbyMessageStore implement
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ " WHERE message_id = ?";
- private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
+ private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
private static final String SELECT_FROM_META_DATA =
"SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
- private static final String CREATE_LINKS_TABLE =
- "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
- + " id_msb bigint not null,"
- + " create_time bigint not null,"
- + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_LINKS =
"SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
@@ -162,15 +109,6 @@ public class DerbyMessageStore implement
+ " id_msb = ?";
private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
-
-
- private static final String CREATE_BRIDGES_TABLE =
- "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
- + " id_msb bigint not null,"
- + " create_time bigint not null,"
- + " link_id_lsb bigint not null,"
- + " link_id_msb bigint not null,"
- + " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_BRIDGES =
"SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
@@ -189,34 +127,19 @@ public class DerbyMessageStore implement
+ "arguments )"
+ " values (?, ?, ?, ?, ?, ?)";
- private static final String CREATE_XIDS_TABLE =
- "CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
- + " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
- "global_id, branch_id ))";
private static final String INSERT_INTO_XIDS =
- "INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
+ "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
-
-
- private static final String CREATE_XID_ACTIONS_TABLE =
- "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
- + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
- "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" +
- ", PRIMARY KEY ( " +
- "format, global_id, branch_id, action_type, queue_id, message_id))";
private static final String INSERT_INTO_XID_ACTIONS =
- "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
+ "INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
"queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
"SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
-
- private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
- + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))";
private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ " ( id, object_type, attributes) VALUES (?,?,?)";
private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
@@ -227,29 +150,17 @@ public class DerbyMessageStore implement
+ " where id = ?";
private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
- private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
- private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
-
- public static final String TYPE = "DERBY";
+ protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
- private final StateManager _stateManager;
+ protected final EventManager _eventManager = new EventManager();
- private final EventManager _eventManager = new EventManager();
-
- private long _totalStoreSize;
- private boolean _limitBusted;
- private long _persistentSizeLowThreshold;
- private long _persistentSizeHighThreshold;
+ protected final StateManager _stateManager;
private MessageStoreRecoveryHandler _messageRecoveryHandler;
-
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
-
private ConfigurationRecoveryHandler _configRecoveryHandler;
- private String _storeLocation;
- public DerbyMessageStore()
+ public AbstractJDBCMessageStore()
{
_stateManager = new StateManager(_eventManager);
}
@@ -297,61 +208,23 @@ public class DerbyMessageStore implement
private void commonConfiguration(String name, Configuration storeConfiguration)
throws ClassNotFoundException, SQLException
{
- initialiseDriver();
+ implementationSpecificConfiguration(name, storeConfiguration);
+ createOrOpenDatabase();
- //Update to pick up QPID_WORK and use that as the default location not just derbyDB
-
- final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
- + File.separator + "derbyDB");
-
- if(!MEMORY_STORE_LOCATION.equals(databasePath))
- {
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
- {
- if (!environmentPath.mkdirs())
- {
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
- }
- }
- }
+ }
- _storeLocation = databasePath;
+ protected abstract void implementationSpecificConfiguration(String name, Configuration storeConfiguration) throws ClassNotFoundException;
- _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
- _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
- if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
- {
- _persistentSizeLowThreshold = _persistentSizeHighThreshold;
- }
+ abstract protected Logger getLogger();
- createOrOpenDatabase(name, databasePath);
+ abstract protected String getSqlBlobType();
- Connection conn = newAutoCommitConnection();;
- try
- {
- _totalStoreSize = getSizeOnDisk(conn);
- }
- finally
- {
- conn.close();
- }
- }
+ abstract protected String getSqlVarBinaryType(int size);
- private static synchronized void initialiseDriver() throws ClassNotFoundException
- {
- if(DRIVER_CLASS == null)
- {
- DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
- }
- }
+ abstract protected String getSqlBigIntType();
- private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
+ protected void createOrOpenDatabase() throws SQLException
{
- //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
-
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
@@ -366,8 +239,6 @@ public class DerbyMessageStore implement
conn.close();
}
-
-
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
@@ -403,7 +274,8 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE);
+ stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
}
finally
{
@@ -419,7 +291,8 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
+ stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
+ + getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
}
finally
{
@@ -436,7 +309,13 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_META_DATA_TABLE);
+ stmt.execute("CREATE TABLE "
+ + META_DATA_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, meta_data "
+ + getSqlBlobType()
+ + ", PRIMARY KEY ( message_id ) )");
}
finally
{
@@ -446,7 +325,6 @@ public class DerbyMessageStore implement
}
-
private void createMessageContentTable(final Connection conn) throws SQLException
{
if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
@@ -454,7 +332,13 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
+ stmt.execute("CREATE TABLE "
+ + MESSAGE_CONTENT_TABLE_NAME
+ + " ( message_id "
+ + getSqlBigIntType()
+ + " not null, content "
+ + getSqlBlobType()
+ + ", PRIMARY KEY (message_id) )");
}
finally
{
@@ -471,7 +355,10 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_LINKS_TABLE);
+ stmt.execute("CREATE TABLE "+ LINKS_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
}
finally
{
@@ -480,7 +367,6 @@ public class DerbyMessageStore implement
}
}
-
private void createBridgeTable(final Connection conn) throws SQLException
{
if(!tableExists(BRIDGES_TABLE_NAME, conn))
@@ -488,7 +374,12 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_BRIDGES_TABLE);
+ stmt.execute("CREATE TABLE "+ BRIDGES_TABLE_NAME +" ( id_lsb " + getSqlBigIntType() + " not null,"
+ + " id_msb " + getSqlBigIntType() + " not null,"
+ + " create_time " + getSqlBigIntType() + " not null,"
+ + " link_id_lsb " + getSqlBigIntType() + " not null,"
+ + " link_id_msb " + getSqlBigIntType() + " not null,"
+ + " arguments "+getSqlBlobType()+", PRIMARY KEY ( id_lsb, id_msb ))");
}
finally
{
@@ -504,7 +395,16 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_XIDS_TABLE);
+ stmt.execute("CREATE TABLE "
+ + XID_TABLE_NAME
+ + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id "
+ + getSqlVarBinaryType(64)
+ + ", branch_id "
+ + getSqlVarBinaryType(64)
+ + " , PRIMARY KEY ( format, "
+ +
+ "global_id, branch_id ))");
}
finally
{
@@ -513,7 +413,6 @@ public class DerbyMessageStore implement
}
}
-
private void createXidActionTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
@@ -521,7 +420,12 @@ public class DerbyMessageStore implement
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_XID_ACTIONS_TABLE);
+ stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null,"
+ + " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType(
+ 64) + " not null, " +
+ "action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" +
+ ", PRIMARY KEY ( " +
+ "format, global_id, branch_id, action_type, queue_id, message_id))");
}
finally
{
@@ -530,29 +434,31 @@ public class DerbyMessageStore implement
}
}
- private boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
- PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
+ DatabaseMetaData metaData = conn.getMetaData();
+ ResultSet rs = metaData.getTables(null, null, "%", null);
+
try
{
- stmt.setString(1, tableName);
- ResultSet rs = stmt.executeQuery();
- try
- {
- return rs.next();
- }
- finally
+
+ while(rs.next())
{
- rs.close();
+ final String table = rs.getString(3);
+ if(tableName.equalsIgnoreCase(table))
+ {
+ return true;
+ }
}
+ return false;
}
finally
{
- stmt.close();
+ rs.close();
}
}
- private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
+ protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
try
{
@@ -581,34 +487,20 @@ public class DerbyMessageStore implement
_closed.getAndSet(true);
_stateManager.attainState(State.CLOSING);
- try
- {
- Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
- // Shouldn't reach this point - shutdown=true should throw SQLException
- conn.close();
- _logger.error("Unable to shut down the store");
- }
- catch (SQLException e)
- {
- if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
- {
- //expected and represents a clean shutdown of this database only, do nothing.
- }
- else
- {
- _logger.error("Exception whilst shutting down the store: " + e);
- }
- }
+ doClose();
_stateManager.attainState(State.CLOSED);
}
+
+ protected abstract void doClose() throws Exception;
+
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
if(metaData.isPersistent())
{
- return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
+ return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
}
else
{
@@ -637,12 +529,12 @@ public class DerbyMessageStore implement
if (results == 0)
{
- _logger.warn("Message metadata not found for message id " + messageId);
+ getLogger().warn("Message metadata not found for message id " + messageId);
}
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Deleted metadata for message " + messageId);
+ getLogger().debug("Deleted metadata for message " + messageId);
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
@@ -733,7 +625,7 @@ public class DerbyMessageStore implement
@Override
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
- _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
+ getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
if (_stateManager.isInState(State.ACTIVE))
{
@@ -749,7 +641,7 @@ public class DerbyMessageStore implement
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
- * @throws AMQStoreException If the operation fails for any reason.
+ * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
*/
@Override
public void updateQueue(final AMQQueue queue) throws AMQStoreException
@@ -766,11 +658,12 @@ public class DerbyMessageStore implement
}
+
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
*/
- private Connection newAutoCommitConnection() throws SQLException
+ protected Connection newAutoCommitConnection() throws SQLException
{
final Connection connection = newConnection();
try
@@ -797,7 +690,7 @@ public class DerbyMessageStore implement
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions disabled.
*/
- private Connection newConnection() throws SQLException
+ protected Connection newConnection() throws SQLException
{
final Connection connection = DriverManager.getConnection(_connectionURL);
try
@@ -823,7 +716,7 @@ public class DerbyMessageStore implement
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
- _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+ getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
int results = removeConfiguredObject(queue.getId());
if (results == 0)
{
@@ -863,12 +756,10 @@ public class DerbyMessageStore implement
return argumentBytes;
}
-
-
@Override
public Transaction newTransaction()
{
- return new DerbyTransaction();
+ return new JDBCTransaction();
}
public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
@@ -878,9 +769,18 @@ public class DerbyMessageStore implement
try
{
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]");
+ getLogger().debug("Enqueuing message "
+ + messageId
+ + " on queue "
+ + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + queue.getId()
+ + "[Connection"
+ + conn
+ + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
@@ -894,10 +794,11 @@ public class DerbyMessageStore implement
{
stmt.close();
}
+
}
catch (SQLException e)
{
- _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ " to database", e);
}
@@ -916,7 +817,7 @@ public class DerbyMessageStore implement
try
{
stmt.setString(1, queue.getId().toString());
- stmt.setLong(2,messageId);
+ stmt.setLong(2, messageId);
int results = stmt.executeUpdate();
@@ -927,27 +828,29 @@ public class DerbyMessageStore implement
+ " with id " + queue.getId());
}
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
- + " with id " + queue.getId());
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
+ ? ((AMQQueue) queue).getName()
+ : "")
+ + " with id " + queue.getId());
}
}
finally
{
stmt.close();
}
+
}
catch (SQLException e)
{
- _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ " with id " + queue.getId() + " from database", e);
}
}
-
private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
throws AMQStoreException
{
@@ -993,13 +896,12 @@ public class DerbyMessageStore implement
}
catch (SQLException e)
{
- _logger.error("Failed to dequeue: " + e.getMessage(), e);
+ getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new AMQStoreException("Error deleting enqueued message with xid", e);
}
}
-
private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
{
@@ -1061,7 +963,7 @@ public class DerbyMessageStore implement
}
catch (SQLException e)
{
- _logger.error("Failed to enqueue: " + e.getMessage(), e);
+ getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new AMQStoreException("Error writing xid ", e);
}
@@ -1091,9 +993,9 @@ public class DerbyMessageStore implement
Connection conn = connWrapper.getConnection();
conn.commit();
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("commit tran completed");
+ getLogger().debug("commit tran completed");
}
conn.close();
@@ -1121,9 +1023,9 @@ public class DerbyMessageStore implement
throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
}
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("abort tran called: " + connWrapper.getConnection());
+ getLogger().debug("abort tran called: " + connWrapper.getConnection());
}
try
@@ -1144,13 +1046,12 @@ public class DerbyMessageStore implement
return _messageId.incrementAndGet();
}
-
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
- if(_logger.isDebugEnabled())
+ if(getLogger().isDebugEnabled())
{
- _logger.debug("Adding metadata for message " +messageId);
+ getLogger().debug("Adding metadata for message " + messageId);
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
@@ -1161,7 +1062,7 @@ public class DerbyMessageStore implement
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
@@ -1198,10 +1099,7 @@ public class DerbyMessageStore implement
}
-
-
-
- private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1221,20 +1119,19 @@ public class DerbyMessageStore implement
{
long messageId = rs.getLong(1);
- Blob dataAsBlob = rs.getBlob(2);
-
if(messageId > maxId)
{
maxId = messageId;
}
- byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -1259,8 +1156,7 @@ public class DerbyMessageStore implement
}
-
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1378,7 +1274,7 @@ public class DerbyMessageStore implement
}
}
- private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1481,10 +1377,8 @@ public class DerbyMessageStore implement
if(rs.next())
{
- Blob dataAsBlob = rs.getBlob(1);
-
- byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
@@ -1513,12 +1407,13 @@ public class DerbyMessageStore implement
}
}
+ protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
private void addContent(Connection conn, long messageId, ByteBuffer src)
{
- if(_logger.isDebugEnabled())
+ if(getLogger().isDebugEnabled())
{
- _logger.debug("Adding content for message " +messageId);
+ getLogger().debug("Adding content for message " + messageId);
}
PreparedStatement stmt = null;
@@ -1548,7 +1443,6 @@ public class DerbyMessageStore implement
}
-
public int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
@@ -1567,10 +1461,8 @@ public class DerbyMessageStore implement
if (rs.next())
{
- Blob dataAsBlob = rs.getBlob(1);
-
- final int size = (int) dataAsBlob.length();
- byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ int size = dataAsBytes.length;
if (offset > size)
{
@@ -1611,13 +1503,13 @@ public class DerbyMessageStore implement
}
- private class DerbyTransaction implements Transaction
+ protected class JDBCTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
- private DerbyTransaction()
+ protected JDBCTransaction()
{
try
{
@@ -1633,11 +1525,11 @@ public class DerbyMessageStore implement
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
final StoredMessage storedMessage = message.getStoredMessage();
- if(storedMessage instanceof StoredDerbyMessage)
+ if(storedMessage instanceof StoredJDBCMessage)
{
try
{
- ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
+ ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
@@ -1645,27 +1537,27 @@ public class DerbyMessageStore implement
}
}
_storeSizeIncrease += storedMessage.getMetaData().getContentSize();
- DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
+ AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+ AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void commitTran() throws AMQStoreException
{
- DerbyMessageStore.this.commitTran(_connWrapper);
+ AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ final StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
}
@@ -1673,26 +1565,24 @@ public class DerbyMessageStore implement
@Override
public void abortTran() throws AMQStoreException
{
- DerbyMessageStore.this.abortTran(_connWrapper);
+ AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
{
- DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
throws AMQStoreException
{
- DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
-
-
- private class StoredDerbyMessage implements StoredMessage
+ private class StoredJDBCMessage implements StoredMessage
{
private final long _messageId;
@@ -1704,14 +1594,14 @@ public class DerbyMessageStore implement
private volatile SoftReference<byte[]> _dataRef;
- StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
+ StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
{
this(messageId, metaData, false);
}
- StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean isRecovered)
+ StoredJDBCMessage(long messageId,
+ StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
_isRecovered = isRecovered;
@@ -1731,7 +1621,7 @@ public class DerbyMessageStore implement
{
try
{
- metaData = DerbyMessageStore.this.getMetaData(_messageId);
+ metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
}
catch (SQLException e)
{
@@ -1750,7 +1640,7 @@ public class DerbyMessageStore implement
}
@Override
- public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
+ public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.slice();
@@ -1773,7 +1663,7 @@ public class DerbyMessageStore implement
}
@Override
- public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
+ public int getContent(int offsetInMessage, ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
if(data != null)
@@ -1784,7 +1674,7 @@ public class DerbyMessageStore implement
}
else
{
- return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@@ -1817,9 +1707,9 @@ public class DerbyMessageStore implement
}
catch (SQLException e)
{
- if(_logger.isDebugEnabled())
+ if(getLogger().isDebugEnabled())
{
- _logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
+ getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
}
throw new RuntimeException(e);
}
@@ -1834,7 +1724,7 @@ public class DerbyMessageStore implement
public void remove()
{
int delta = getMetaData().getContentSize();
- DerbyMessageStore.this.removeMessage(_messageId);
+ AbstractJDBCMessageStore.this.removeMessage(_messageId);
storedSizeChange(-delta);
}
@@ -1845,7 +1735,7 @@ public class DerbyMessageStore implement
try
{
storeMetaData(conn, _messageId, _metaData);
- DerbyMessageStore.this.addContent(conn, _messageId,
+ AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
@@ -1854,9 +1744,9 @@ public class DerbyMessageStore implement
_data = null;
}
- if(_logger.isDebugEnabled())
+ if(getLogger().isDebugEnabled())
{
- _logger.debug("Storing message " + _messageId + " to store");
+ getLogger().debug("Storing message " + _messageId + " to store");
}
}
}
@@ -1867,7 +1757,7 @@ public class DerbyMessageStore implement
}
}
- private void closeConnection(final Connection conn)
+ protected void closeConnection(final Connection conn)
{
if(conn != null)
{
@@ -1877,12 +1767,12 @@ public class DerbyMessageStore implement
}
catch (SQLException e)
{
- _logger.error("Problem closing connection", e);
+ getLogger().error("Problem closing connection", e);
}
}
}
- private void closePreparedStatement(final PreparedStatement stmt)
+ protected void closePreparedStatement(final PreparedStatement stmt)
{
if (stmt != null)
{
@@ -1892,7 +1782,7 @@ public class DerbyMessageStore implement
}
catch(SQLException e)
{
- _logger.error("Problem closing prepared statement", e);
+ getLogger().error("Problem closing prepared statement", e);
}
}
}
@@ -1903,12 +1793,6 @@ public class DerbyMessageStore implement
_eventManager.addEventListener(eventListener, events);
}
- @Override
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
@@ -2085,12 +1969,7 @@ public class DerbyMessageStore implement
if (rs.next())
{
String type = rs.getString(1);
- Blob blob = rs.getBlob(2);
- String attributes = null;
- if (blob != null)
- {
- attributes = blobToString(blob);
- }
+ String attributes = getBlobAsString(rs, 2);
result = new ConfiguredObjectRecord(id, type, attributes);
}
}
@@ -2117,12 +1996,6 @@ public class DerbyMessageStore implement
return result;
}
- private String blobToString(Blob blob) throws SQLException
- {
- byte[] bytes = blob.getBytes(1, (int)blob.length());
- return new String(bytes, UTF8_CHARSET);
- }
-
private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
{
ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
@@ -2139,7 +2012,7 @@ public class DerbyMessageStore implement
{
String id = rs.getString(1);
String objectType = rs.getString(2);
- String attributes = blobToString(rs.getBlob(3));
+ String attributes = getBlobAsString(rs, 3);
results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
}
}
@@ -2160,180 +2033,8 @@ public class DerbyMessageStore implement
return results;
}
- private synchronized void storedSizeChange(final int delta)
- {
- if(getPersistentSizeHighThreshold() > 0)
- {
- synchronized(this)
- {
- // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
- // time, so we do so only when there's been enough change that it is worth looking again. We do this by
- // assuming the total size will change by less than twice the amount of the message data change.
- long newSize = _totalStoreSize += 3*delta;
-
- Connection conn = null;
- try
- {
-
- if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
- {
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(_totalStoreSize > getPersistentSizeHighThreshold())
- {
- _limitBusted = true;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- }
- }
- else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
- {
- long oldSize = _totalStoreSize;
- conn = newAutoCommitConnection();
- _totalStoreSize = getSizeOnDisk(conn);
- if(oldSize <= _totalStoreSize)
- {
-
- reduceSizeOnDisk(conn);
-
- _totalStoreSize = getSizeOnDisk(conn);
- }
-
- if(_totalStoreSize < getPersistentSizeLowThreshold())
- {
- _limitBusted = false;
- _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- }
-
-
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new RuntimeException("Exception will processing store size change", e);
- }
- }
- }
- }
-
- private void reduceSizeOnDisk(Connection conn)
- {
- CallableStatement cs = null;
- PreparedStatement stmt = null;
- try
- {
- String tableQuery =
- "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
- stmt = conn.prepareStatement(tableQuery);
- ResultSet rs = null;
-
- List<String> schemas = new ArrayList<String>();
- List<String> tables = new ArrayList<String>();
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- schemas.add(rs.getString(1));
- tables.add(rs.getString(2));
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
+ protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
- cs = conn.prepareCall
- ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
-
- for(int i = 0; i < schemas.size(); i++)
- {
- cs.setString(1, schemas.get(i));
- cs.setString(2, tables.get(i));
- cs.setShort(3, (short) 0);
- cs.execute();
- }
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new RuntimeException("Error reducing on disk size", e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closePreparedStatement(cs);
- }
-
- }
-
- private long getSizeOnDisk(Connection conn)
- {
- PreparedStatement stmt = null;
- try
- {
- String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
- " FROM " +
- " SYS.SYSTABLES systabs," +
- " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
- " WHERE systabs.tabletype = 'T'";
-
- stmt = conn.prepareStatement(sizeQuery);
-
- ResultSet rs = null;
- long size = 0l;
-
- try
- {
- rs = stmt.executeQuery();
- while(rs.next())
- {
- size = rs.getLong(1);
- }
- }
- finally
- {
- if(rs != null)
- {
- rs.close();
- }
- }
-
- return size;
-
- }
- catch (SQLException e)
- {
- closeConnection(conn);
- throw new RuntimeException("Error establishing on disk size", e);
- }
- finally
- {
- closePreparedStatement(stmt);
- }
-
- }
-
-
- private long getPersistentSizeLowThreshold()
- {
- return _persistentSizeLowThreshold;
- }
-
- private long getPersistentSizeHighThreshold()
- {
- return _persistentSizeHighThreshold;
- }
-
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
+ protected abstract void storedSizeChange(int storeSizeIncrease);
-}
\ No newline at end of file
+}
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Apr 22 11:15:21 2013
@@ -0,0 +1,6 @@
+/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:886720-886722,887145,892761,930288
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:805429-821809
+/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org