You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [18/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jm...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Aug 8 11:56:59 2007
@@ -97,16 +97,17 @@
public String[] getDropSchemaStatements() {
if (dropSchemaStatements == null) {
- dropSchemaStatements = new String[] { "DROP TABLE " + getFullAckTableName() + "",
- "DROP TABLE " + getFullMessageTableName() + "", };
+ dropSchemaStatements = new String[] {"DROP TABLE " + getFullAckTableName() + "",
+ "DROP TABLE " + getFullMessageTableName() + "",};
}
return dropSchemaStatements;
}
public String getAddMessageStatement() {
if (addMessageStatement == null) {
- addMessageStatement = "INSERT INTO " + getFullMessageTableName()
- + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
+ addMessageStatement = "INSERT INTO "
+ + getFullMessageTableName()
+ + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
}
return addMessageStatement;
}
@@ -128,7 +129,7 @@
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
- + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
+ + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
}
return findMessageSequenceIdStatement;
}
@@ -143,7 +144,7 @@
public String getFindAllMessagesStatement() {
if (findAllMessagesStatement == null) {
findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
- + " WHERE CONTAINER=? ORDER BY ID";
+ + " WHERE CONTAINER=? ORDER BY ID";
}
return findAllMessagesStatement;
}
@@ -164,8 +165,10 @@
public String getCreateDurableSubStatement() {
if (createDurableSubStatement == null) {
- createDurableSubStatement = "INSERT INTO " + getFullAckTableName()
- + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)";
+ createDurableSubStatement = "INSERT INTO "
+ + getFullAckTableName()
+ + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) "
+ + "VALUES (?, ?, ?, ?, ?, ?)";
}
return createDurableSubStatement;
}
@@ -173,15 +176,15 @@
public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) {
findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return findDurableSubStatement;
}
public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) {
- findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName()
- + " WHERE CONTAINER=?";
+ findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
+ + getFullAckTableName() + " WHERE CONTAINER=?";
}
return findAllDurableSubsStatement;
}
@@ -189,7 +192,7 @@
public String getUpdateLastAckOfDurableSubStatement() {
if (updateLastAckOfDurableSubStatement == null) {
updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return updateLastAckOfDurableSubStatement;
}
@@ -197,62 +200,76 @@
public String getDeleteSubscriptionStatement() {
if (deleteSubscriptionStatement == null) {
deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName()
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return deleteSubscriptionStatement;
}
public String getFindAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {
- findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
+ findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
+ + " M, " + getFullAckTableName() + " D "
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ + " ORDER BY M.ID";
}
return findAllDurableSubMessagesStatement;
}
-
- public String getFindDurableSubMessagesStatement(){
- if(findDurableSubMessagesStatement==null){
- findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID";
+
+ public String getFindDurableSubMessagesStatement() {
+ if (findDurableSubMessagesStatement == null) {
+ findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
+ + getFullAckTableName() + " D "
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?"
+ + " ORDER BY M.ID";
}
return findDurableSubMessagesStatement;
}
-
+
public String findAllDurableSubMessagesStatement() {
if (findAllDurableSubMessagesStatement == null) {
- findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID";
+ findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
+ + " M, " + getFullAckTableName() + " D "
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
+ + " ORDER BY M.ID";
}
return findAllDurableSubMessagesStatement;
}
-
- public String getNextDurableSubscriberMessageStatement(){
- if (nextDurableSubscriberMessageStatement == null){
- nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
+
+ public String getNextDurableSubscriberMessageStatement() {
+ if (nextDurableSubscriberMessageStatement == null) {
+ nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "
+ + getFullMessageTableName()
+ + " M, "
+ + getFullAckTableName()
+ + " D "
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?"
+ + " ORDER BY M.ID ";
}
return nextDurableSubscriberMessageStatement;
}
-
+
/**
* @return the durableSubscriberMessageCountStatement
*/
-
-
- public String getDurableSubscriberMessageCountStatement(){
- if (durableSubscriberMessageCountStatement==null){
- durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+
+ public String getDurableSubscriberMessageCountStatement() {
+ if (durableSubscriberMessageCountStatement == null) {
+ durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
+ + getFullMessageTableName()
+ + " M, "
+ + getFullAckTableName()
+ + " D "
+ + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
}
return durableSubscriberMessageCountStatement;
}
-
- public String getFindAllDestinationsStatement() {
+
+ public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
}
@@ -276,13 +293,15 @@
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
- + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT min(" + getFullAckTableName()
- + ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName()
- + ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)";
+ + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= "
+ + "( SELECT min(" + getFullAckTableName() + ".LAST_ACKED_ID) "
+ + "FROM " + getFullAckTableName() + " WHERE "
+ + getFullAckTableName() + ".CONTAINER=" + getFullMessageTableName()
+ + ".CONTAINER)";
}
return deleteOldMessagesStatement;
}
-
+
public String getLockCreateStatement() {
if (lockCreateStatement == null) {
lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
@@ -293,21 +312,21 @@
}
return lockCreateStatement;
}
-
+
public String getLockUpdateStatement() {
if (lockUpdateStatement == null) {
lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET time = ? WHERE ID = 1";
}
return lockUpdateStatement;
}
-
+
/**
* @return the destinationMessageCountStatement
*/
- public String getDestinationMessageCountStatement(){
- if (destinationMessageCountStatement==null) {
- destinationMessageCountStatement= "SELECT COUNT(*) FROM " + getFullMessageTableName()
- + " WHERE CONTAINER=?";
+ public String getDestinationMessageCountStatement() {
+ if (destinationMessageCountStatement == null) {
+ destinationMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName()
+ + " WHERE CONTAINER=?";
}
return destinationMessageCountStatement;
}
@@ -315,27 +334,26 @@
/**
* @return the findNextMessagesStatement
*/
- public String getFindNextMessagesStatement(){
- if(findNextMessagesStatement == null) {
- findNextMessagesStatement="SELECT ID, MSG FROM " + getFullMessageTableName()
- + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+ public String getFindNextMessagesStatement() {
+ if (findNextMessagesStatement == null) {
+ findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
}
return findNextMessagesStatement;
}
-
+
/**
* @return the lastAckedDurableSubscriberMessageStatement
*/
- public String getLastAckedDurableSubscriberMessageStatement(){
- if(lastAckedDurableSubscriberMessageStatement==null) {
- lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName()
- + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+ public String getLastAckedDurableSubscriberMessageStatement() {
+ if (lastAckedDurableSubscriberMessageStatement == null) {
+ lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM "
+ + getFullAckTableName()
+ + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return lastAckedDurableSubscriberMessageStatement;
}
-
-
public String getFullMessageTableName() {
return getTablePrefix() + getMessageTableName();
}
@@ -343,12 +361,11 @@
public String getFullAckTableName() {
return getTablePrefix() + getDurableSubAcksTableName();
}
-
+
public String getFullLockTableName() {
return getTablePrefix() + getLockTableName();
}
-
/**
* @return Returns the containerNameDataType.
*/
@@ -357,8 +374,7 @@
}
/**
- * @param containerNameDataType
- * The containerNameDataType to set.
+ * @param containerNameDataType The containerNameDataType to set.
*/
public void setContainerNameDataType(String containerNameDataType) {
this.containerNameDataType = containerNameDataType;
@@ -372,8 +388,7 @@
}
/**
- * @param messageDataType
- * The messageDataType to set.
+ * @param messageDataType The messageDataType to set.
*/
public void setBinaryDataType(String messageDataType) {
this.binaryDataType = messageDataType;
@@ -387,8 +402,7 @@
}
/**
- * @param messageTableName
- * The messageTableName to set.
+ * @param messageTableName The messageTableName to set.
*/
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
@@ -402,8 +416,7 @@
}
/**
- * @param msgIdDataType
- * The msgIdDataType to set.
+ * @param msgIdDataType The msgIdDataType to set.
*/
public void setMsgIdDataType(String msgIdDataType) {
this.msgIdDataType = msgIdDataType;
@@ -417,8 +430,7 @@
}
/**
- * @param sequenceDataType
- * The sequenceDataType to set.
+ * @param sequenceDataType The sequenceDataType to set.
*/
public void setSequenceDataType(String sequenceDataType) {
this.sequenceDataType = sequenceDataType;
@@ -432,8 +444,7 @@
}
/**
- * @param tablePrefix
- * The tablePrefix to set.
+ * @param tablePrefix The tablePrefix to set.
*/
public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
@@ -447,13 +458,12 @@
}
/**
- * @param durableSubAcksTableName
- * The durableSubAcksTableName to set.
+ * @param durableSubAcksTableName The durableSubAcksTableName to set.
*/
public void setDurableSubAcksTableName(String durableSubAcksTableName) {
this.durableSubAcksTableName = durableSubAcksTableName;
}
-
+
public String getLockTableName() {
return lockTableName;
}
@@ -583,52 +593,47 @@
}
/**
- * @param findDurableSubMessagesStatement the findDurableSubMessagesStatement to set
+ * @param findDurableSubMessagesStatement the
+ * findDurableSubMessagesStatement to set
*/
- public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){
- this.findDurableSubMessagesStatement=findDurableSubMessagesStatement;
+ public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement) {
+ this.findDurableSubMessagesStatement = findDurableSubMessagesStatement;
}
/**
* @param nextDurableSubscriberMessageStatement the nextDurableSubscriberMessageStatement to set
*/
- public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){
- this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement;
+ public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement) {
+ this.nextDurableSubscriberMessageStatement = nextDurableSubscriberMessageStatement;
}
-
/**
* @param durableSubscriberMessageCountStatement the durableSubscriberMessageCountStatement to set
*/
- public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){
- this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement;
- }
-
+ public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement) {
+ this.durableSubscriberMessageCountStatement = durableSubscriberMessageCountStatement;
+ }
+
/**
* @param findNextMessagesStatement the findNextMessagesStatement to set
*/
- public void setFindNextMessagesStatement(String findNextMessagesStatement){
- this.findNextMessagesStatement=findNextMessagesStatement;
+ public void setFindNextMessagesStatement(String findNextMessagesStatement) {
+ this.findNextMessagesStatement = findNextMessagesStatement;
}
/**
* @param destinationMessageCountStatement the destinationMessageCountStatement to set
*/
- public void setDestinationMessageCountStatement(String destinationMessageCountStatement){
- this.destinationMessageCountStatement=destinationMessageCountStatement;
+ public void setDestinationMessageCountStatement(String destinationMessageCountStatement) {
+ this.destinationMessageCountStatement = destinationMessageCountStatement;
}
-
-
-
/**
* @param lastAckedDurableSubscriberMessageStatement the lastAckedDurableSubscriberMessageStatement to set
*/
- public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){
- this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement;
+ public void setLastAckedDurableSubscriberMessageStatement(
+ String lastAckedDurableSubscriberMessageStatement) {
+ this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement;
}
-
-
-
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java Wed Aug 8 11:56:59 2007
@@ -36,7 +36,7 @@
public class TransactionContext {
private static final Log log = LogFactory.getLog(TransactionContext.class);
-
+
private final DataSource dataSource;
private Connection connection;
private boolean inTx;
@@ -49,7 +49,7 @@
}
public Connection getConnection() throws IOException {
- if( connection == null ) {
+ if (connection == null) {
try {
connection = dataSource.getConnection();
boolean autoCommit = !inTx;
@@ -60,7 +60,7 @@
JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
throw IOExceptionSupport.create(e);
}
-
+
try {
connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
} catch (Throwable e) {
@@ -77,51 +77,54 @@
try {
executeBatch(removedMessageStatement, "Failed to remove a message");
} finally {
- removedMessageStatement=null;
+ removedMessageStatement = null;
try {
executeBatch(updateLastAckStatement, "Failed to ack a message");
} finally {
- updateLastAckStatement=null;
+ updateLastAckStatement = null;
}
}
}
}
private void executeBatch(PreparedStatement p, String message) throws SQLException {
- if( p == null )
+ if (p == null)
return;
-
+
try {
int[] rc = p.executeBatch();
for (int i = 0; i < rc.length; i++) {
int code = rc[i];
- if ( code < 0 && code != Statement.SUCCESS_NO_INFO ) {
+ if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
throw new SQLException(message + ". Response code: " + code);
}
}
} finally {
- try { p.close(); } catch (Throwable e) { }
+ try {
+ p.close();
+ } catch (Throwable e) {
+ }
}
}
-
+
public void close() throws IOException {
- if( !inTx ) {
+ if (!inTx) {
try {
-
+
/**
* we are not in a transaction so should not be committing ??
- * This was previously commented out - but had
- * adverse affects on testing - so it's back!
+ * This was previously commented out - but had adverse affects
+ * on testing - so it's back!
*
*/
- try{
+ try {
executeBatch();
} finally {
if (connection != null && !connection.getAutoCommit()) {
connection.commit();
}
}
-
+
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Error while closing connection: ", e);
throw IOExceptionSupport.create(e);
@@ -131,60 +134,60 @@
connection.close();
}
} catch (Throwable e) {
- log.warn("Close failed: "+e.getMessage(), e);
+ log.warn("Close failed: " + e.getMessage(), e);
} finally {
- connection=null;
+ connection = null;
}
}
}
}
public void begin() throws IOException {
- if( inTx )
+ if (inTx)
throw new IOException("Already started.");
inTx = true;
connection = getConnection();
}
public void commit() throws IOException {
- if( !inTx )
+ if (!inTx)
throw new IOException("Not started.");
try {
executeBatch();
- if( !connection.getAutoCommit() )
+ if (!connection.getAutoCommit())
connection.commit();
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Commit failed: ", e);
throw IOExceptionSupport.create(e);
} finally {
- inTx=false;
+ inTx = false;
close();
}
}
-
+
public void rollback() throws IOException {
- if( !inTx )
+ if (!inTx)
throw new IOException("Not started.");
try {
- if( addMessageStatement != null ) {
+ if (addMessageStatement != null) {
addMessageStatement.close();
- addMessageStatement=null;
+ addMessageStatement = null;
}
- if( removedMessageStatement != null ) {
+ if (removedMessageStatement != null) {
removedMessageStatement.close();
- removedMessageStatement=null;
+ removedMessageStatement = null;
}
- if( updateLastAckStatement != null ) {
+ if (updateLastAckStatement != null) {
updateLastAckStatement.close();
- updateLastAckStatement=null;
+ updateLastAckStatement = null;
}
connection.rollback();
-
+
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Rollback failed: ", e);
throw IOExceptionSupport.create(e);
} finally {
- inTx=false;
+ inTx = false;
close();
}
}
@@ -192,6 +195,7 @@
public PreparedStatement getAddMessageStatement() {
return addMessageStatement;
}
+
public void setAddMessageStatement(PreparedStatement addMessageStatement) {
this.addMessageStatement = addMessageStatement;
}
@@ -199,6 +203,7 @@
public PreparedStatement getUpdateLastAckStatement() {
return updateLastAckStatement;
}
+
public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
this.updateLastAckStatement = ackMessageStatement;
}
@@ -206,6 +211,7 @@
public PreparedStatement getRemovedMessageStatement() {
return removedMessageStatement;
}
+
public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
this.removedMessageStatement = removedMessageStatement;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Wed Aug 8 11:56:59 2007
@@ -30,19 +30,17 @@
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;
-
/**
- * This JDBCAdapter inserts and extracts BLOB data using the
- * getBlob()/setBlob() operations. This is a little more involved
- * since to insert a blob you have to:
+ * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
+ * operations. This is a little more involved since to insert a blob you have
+ * to:
*
- * 1: insert empty blob.
- * 2: select the blob
- * 3: finally update the blob with data value.
+ * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
+ * value.
*
* The databases/JDBC drivers that use this adapter are:
* <ul>
- * <li></li>
+ * <li></li>
* </ul>
*
* @org.apache.xbean.XBean element="blobJDBCAdapter"
@@ -50,23 +48,22 @@
* @version $Revision: 1.2 $
*/
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
-
- public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data) throws SQLException,
- JMSException {
+
+ public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
+ throws SQLException, JMSException {
PreparedStatement s = null;
ResultSet rs = null;
try {
-
+
// Add the Blob record.
s = c.prepareStatement(statements.getAddMessageStatement());
s.setLong(1, seq);
s.setString(2, destinationName);
s.setString(3, messageID);
s.setString(4, " ");
-
+
if (s.executeUpdate() != 1)
- throw new JMSException("Failed to broker message: " + messageID
- + " in container.");
+ throw new JMSException("Failed to broker message: " + messageID + " in container.");
s.close();
// Select the blob record so that we can update it.
@@ -74,8 +71,7 @@
s.setLong(1, seq);
rs = s.executeQuery();
if (!rs.next())
- throw new JMSException("Failed to broker message: " + messageID
- + " in container.");
+ throw new JMSException("Failed to broker message: " + messageID + " in container.");
// Update the blob
Blob blob = rs.getBlob(1);
@@ -90,8 +86,7 @@
s.setLong(2, seq);
} catch (IOException e) {
- throw (SQLException) new SQLException("BLOB could not be updated: "
- + e).initCause(e);
+ throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
try {
rs.close();
@@ -103,37 +98,44 @@
}
}
}
-
+
public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
- PreparedStatement s=null; ResultSet rs=null;
- try {
-
- s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
- s.setLong(1, seq);
- rs = s.executeQuery();
-
- if( !rs.next() )
- return null;
- Blob blob = rs.getBlob(1);
- InputStream is = blob.getBinaryStream();
-
- ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
- int ch;
- while( (ch=is.read())>= 0 ) {
- os.write(ch);
- }
- is.close();
- os.close();
-
- return os.toByteArray();
-
- } catch (IOException e) {
- throw (SQLException) new SQLException("BLOB could not be updated: "
- + e).initCause(e);
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+
+ s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+ s.setLong(1, seq);
+ rs = s.executeQuery();
+
+ if (!rs.next()) {
+ return null;
+ }
+ Blob blob = rs.getBlob(1);
+ InputStream is = blob.getBinaryStream();
+
+ ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
+ int ch;
+ while ((ch = is.read()) >= 0) {
+ os.write(ch);
+ }
+ is.close();
+ os.close();
+
+ return os.toByteArray();
+
+ } catch (IOException e) {
+ throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
- try { rs.close(); } catch (Throwable e) {}
- try { s.close(); } catch (Throwable e) {}
- }
+ try {
+ rs.close();
+ } catch (Throwable ignore) {
+ }
+ try {
+ s.close();
+ } catch (Throwable ignore) {
+ }
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Aug 8 11:56:59 2007
@@ -34,10 +34,12 @@
import org.apache.commons.logging.LogFactory;
/**
- * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
- * encouraged to override the default implementation of methods to account for differences in JDBC Driver
- * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
- * The databases/JDBC drivers that use this adapter are:
+ * Implements all the default JDBC operations that are used by the
+ * JDBCPersistenceAdapter. <p/> sub-classing is encouraged to override the
+ * default implementation of methods to account for differences in JDBC Driver
+ * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using
+ * the getBytes()/setBytes() operations. <p/> The databases/JDBC drivers that
+ * use this adapter are:
* <ul>
* <li></li>
* </ul>
@@ -46,381 +48,390 @@
*
* @version $Revision: 1.10 $
*/
-public class DefaultJDBCAdapter implements JDBCAdapter{
+public class DefaultJDBCAdapter implements JDBCAdapter {
- private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class);
+ private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
- protected boolean batchStatments=true;
+ protected boolean batchStatments = true;
- protected void setBinaryData(PreparedStatement s,int index,byte data[]) throws SQLException{
- s.setBytes(index,data);
+ protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
+ s.setBytes(index, data);
}
- protected byte[] getBinaryData(ResultSet rs,int index) throws SQLException{
+ protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
return rs.getBytes(index);
}
- public void doCreateTables(TransactionContext c) throws SQLException,IOException{
- Statement s=null;
- try{
- // Check to see if the table already exists. If it does, then don't log warnings during startup.
- // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version
+ public void doCreateTables(TransactionContext c) throws SQLException, IOException {
+ Statement s = null;
+ try {
+ // Check to see if the table already exists. If it does, then don't
+ // log warnings during startup.
+ // Need to run the scripts anyways since they may contain ALTER
+ // statements that upgrade a previous version
// of the table
- boolean alreadyExists=false;
- ResultSet rs=null;
- try{
- rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(),
- new String[] { "TABLE" });
- alreadyExists=rs.next();
- }catch(Throwable ignore){
- }finally{
+ boolean alreadyExists = false;
+ ResultSet rs = null;
+ try {
+ rs = c.getConnection().getMetaData().getTables(null, null,
+ statements.getFullMessageTableName(),
+ new String[] {"TABLE"});
+ alreadyExists = rs.next();
+ } catch (Throwable ignore) {
+ } finally {
close(rs);
}
- s=c.getConnection().createStatement();
- String[] createStatments=statements.getCreateSchemaStatements();
- for(int i=0;i<createStatments.length;i++){
+ s = c.getConnection().createStatement();
+ String[] createStatments = statements.getCreateSchemaStatements();
+ for (int i = 0; i < createStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
- try{
- log.debug("Executing SQL: "+createStatments[i]);
- boolean rc=s.execute(createStatments[i]);
- }catch(SQLException e){
- if(alreadyExists){
- log.debug("Could not create JDBC tables; The message table already existed."+" Failure was: "
- +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
- +" Vendor code: "+e.getErrorCode());
- }else{
- log.warn("Could not create JDBC tables; they could already exist."+" Failure was: "
- +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
- +" Vendor code: "+e.getErrorCode());
- JDBCPersistenceAdapter.log("Failure details: ",e);
+ try {
+ log.debug("Executing SQL: " + createStatments[i]);
+ boolean rc = s.execute(createStatments[i]);
+ } catch (SQLException e) {
+ if (alreadyExists) {
+ log.debug("Could not create JDBC tables; The message table already existed."
+ + " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
+ + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ } else {
+ log.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
+ + createStatments[i] + " Message: " + e.getMessage() + " SQLState: "
+ + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
}
c.getConnection().commit();
- }finally{
- try{
+ } finally {
+ try {
s.close();
- }catch(Throwable e){
+ } catch (Throwable e) {
}
}
}
- public void doDropTables(TransactionContext c) throws SQLException,IOException{
- Statement s=null;
- try{
- s=c.getConnection().createStatement();
- String[] dropStatments=statements.getDropSchemaStatements();
- for(int i=0;i<dropStatments.length;i++){
+ public void doDropTables(TransactionContext c) throws SQLException, IOException {
+ Statement s = null;
+ try {
+ s = c.getConnection().createStatement();
+ String[] dropStatments = statements.getDropSchemaStatements();
+ for (int i = 0; i < dropStatments.length; i++) {
// This will fail usually since the tables will be
// created already.
- try{
- boolean rc=s.execute(dropStatments[i]);
- }catch(SQLException e){
- log.warn("Could not drop JDBC tables; they may not exist."+" Failure was: "+dropStatments[i]
- +" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()+" Vendor code: "
- +e.getErrorCode());
- JDBCPersistenceAdapter.log("Failure details: ",e);
+ try {
+ boolean rc = s.execute(dropStatments[i]);
+ } catch (SQLException e) {
+ log.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
+ + dropStatments[i] + " Message: " + e.getMessage() + " SQLState: "
+ + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+ JDBCPersistenceAdapter.log("Failure details: ", e);
}
}
c.getConnection().commit();
- }finally{
- try{
+ } finally {
+ try {
s.close();
- }catch(Throwable e){
+ } catch (Throwable e) {
}
}
}
- public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
- rs=s.executeQuery();
- long seq1=0;
- if(rs.next()){
- seq1=rs.getLong(1);
+ public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+ rs = s.executeQuery();
+ long seq1 = 0;
+ if (rs.next()) {
+ seq1 = rs.getLong(1);
}
rs.close();
s.close();
- s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
- rs=s.executeQuery();
- long seq2=0;
- if(rs.next()){
- seq2=rs.getLong(1);
+ s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
+ rs = s.executeQuery();
+ long seq2 = 0;
+ if (rs.next()) {
+ seq2 = rs.getLong(1);
}
- return Math.max(seq1,seq2);
- }finally{
+ return Math.max(seq1, seq2);
+ } finally {
close(rs);
close(s);
}
}
- public void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,byte[] data,
- long expiration) throws SQLException,IOException{
- PreparedStatement s=c.getAddMessageStatement();
- try{
- if(s==null){
- s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
- if(batchStatments){
+ public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination,
+ byte[] data, long expiration) throws SQLException, IOException {
+ PreparedStatement s = c.getAddMessageStatement();
+ try {
+ if (s == null) {
+ s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
+ if (batchStatments) {
c.setAddMessageStatement(s);
}
}
- s.setLong(1,messageID.getBrokerSequenceId());
- s.setString(2,messageID.getProducerId().toString());
- s.setLong(3,messageID.getProducerSequenceId());
- s.setString(4,destination.getQualifiedName());
- s.setLong(5,expiration);
- setBinaryData(s,6,data);
- if(batchStatments){
+ s.setLong(1, messageID.getBrokerSequenceId());
+ s.setString(2, messageID.getProducerId().toString());
+ s.setLong(3, messageID.getProducerSequenceId());
+ s.setString(4, destination.getQualifiedName());
+ s.setLong(5, expiration);
+ setBinaryData(s, 6, data);
+ if (batchStatments) {
s.addBatch();
- }else if(s.executeUpdate()!=1){
+ } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
- }finally{
- if(!batchStatments){
+ } finally {
+ if (!batchStatments) {
s.close();
}
}
}
- public void doAddMessageReference(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
- long expirationTime,String messageRef) throws SQLException,IOException{
- PreparedStatement s=c.getAddMessageStatement();
- try{
- if(s==null){
- s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
- if(batchStatments){
+ public void doAddMessageReference(TransactionContext c, MessageId messageID,
+ ActiveMQDestination destination, long expirationTime, String messageRef)
+ throws SQLException, IOException {
+ PreparedStatement s = c.getAddMessageStatement();
+ try {
+ if (s == null) {
+ s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
+ if (batchStatments) {
c.setAddMessageStatement(s);
}
}
- s.setLong(1,messageID.getBrokerSequenceId());
- s.setString(2,messageID.getProducerId().toString());
- s.setLong(3,messageID.getProducerSequenceId());
- s.setString(4,destination.getQualifiedName());
- s.setLong(5,expirationTime);
- s.setString(6,messageRef);
- if(batchStatments){
+ s.setLong(1, messageID.getBrokerSequenceId());
+ s.setString(2, messageID.getProducerId().toString());
+ s.setLong(3, messageID.getProducerSequenceId());
+ s.setString(4, destination.getQualifiedName());
+ s.setLong(5, expirationTime);
+ s.setString(6, messageRef);
+ if (batchStatments) {
s.addBatch();
- }else if(s.executeUpdate()!=1){
+ } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
- }finally{
- if(!batchStatments){
+ } finally {
+ if (!batchStatments) {
s.close();
}
}
}
- public long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
- s.setString(1,messageID.getProducerId().toString());
- s.setLong(2,messageID.getProducerSequenceId());
- rs=s.executeQuery();
- if(!rs.next()){
+ public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
+ IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
+ s.setString(1, messageID.getProducerId().toString());
+ s.setLong(2, messageID.getProducerSequenceId());
+ rs = s.executeQuery();
+ if (!rs.next()) {
return 0;
}
return rs.getLong(1);
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
- s.setLong(1,seq);
- rs=s.executeQuery();
- if(!rs.next()){
+ public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+ s.setLong(1, seq);
+ rs = s.executeQuery();
+ if (!rs.next()) {
return null;
}
- return getBinaryData(rs,1);
- }finally{
+ return getBinaryData(rs, 1);
+ } finally {
close(rs);
close(s);
}
}
- public String doGetMessageReference(TransactionContext c,long seq) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
- s.setLong(1,seq);
- rs=s.executeQuery();
- if(!rs.next()){
+ public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
+ s.setLong(1, seq);
+ rs = s.executeQuery();
+ if (!rs.next()) {
return null;
}
return rs.getString(1);
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException{
- PreparedStatement s=c.getRemovedMessageStatement();
- try{
- if(s==null){
- s=c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
- if(batchStatments){
+ public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
+ PreparedStatement s = c.getRemovedMessageStatement();
+ try {
+ if (s == null) {
+ s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
+ if (batchStatments) {
c.setRemovedMessageStatement(s);
}
}
- s.setLong(1,seq);
- if(batchStatments){
+ s.setLong(1, seq);
+ if (batchStatments) {
s.addBatch();
- }else if(s.executeUpdate()!=1){
+ } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed to remove message");
}
- }finally{
- if(!batchStatments){
+ } finally {
+ if (!batchStatments) {
s.close();
}
}
}
- public void doRecover(TransactionContext c,ActiveMQDestination destination,JDBCMessageRecoveryListener listener)
- throws Exception{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
- s.setString(1,destination.getQualifiedName());
- rs=s.executeQuery();
- if(statements.isUseExternalMessageReferences()){
- while(rs.next()){
+ public void doRecover(TransactionContext c, ActiveMQDestination destination,
+ JDBCMessageRecoveryListener listener) throws Exception {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
+ s.setString(1, destination.getQualifiedName());
+ rs = s.executeQuery();
+ if (statements.isUseExternalMessageReferences()) {
+ while (rs.next()) {
if (!listener.recoverMessageReference(rs.getString(2))) {
break;
}
}
- }else{
- while(rs.next()){
- if(!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+ } else {
+ while (rs.next()) {
+ if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
break;
}
}
}
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,long seq) throws SQLException,IOException{
- PreparedStatement s=c.getAddMessageStatement();
- try{
- if(s==null){
- s=c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
- if(batchStatments){
+ public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, long seq) throws SQLException, IOException {
+ PreparedStatement s = c.getAddMessageStatement();
+ try {
+ if (s == null) {
+ s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
+ if (batchStatments) {
c.setUpdateLastAckStatement(s);
}
}
- s.setLong(1,seq);
- s.setString(2,destination.getQualifiedName());
- s.setString(3,clientId);
- s.setString(4,subscriptionName);
- if(batchStatments){
+ s.setLong(1, seq);
+ s.setString(2, destination.getQualifiedName());
+ s.setString(3, clientId);
+ s.setString(4, subscriptionName);
+ if (batchStatments) {
s.addBatch();
- }else if(s.executeUpdate()!=1){
+ } else if (s.executeUpdate() != 1) {
throw new SQLException("Failed add a message");
}
- }finally{
- if(!batchStatments){
+ } finally {
+ if (!batchStatments) {
s.close();
}
}
}
- public void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception{
- // dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
- rs=s.executeQuery();
- if(statements.isUseExternalMessageReferences()){
- while(rs.next()){
- if (!listener.recoverMessageReference(rs.getString(2))){
+ public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, JDBCMessageRecoveryListener listener)
+ throws Exception {
+ // dumpTables(c,
+ // destination.getQualifiedName(),clientId,subscriptionName);
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ rs = s.executeQuery();
+ if (statements.isUseExternalMessageReferences()) {
+ while (rs.next()) {
+ if (!listener.recoverMessageReference(rs.getString(2))) {
break;
}
}
- }else{
- while(rs.next()){
- if (!listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))) {
+ } else {
+ while (rs.next()) {
+ if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
break;
}
}
}
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName, long seq, int maxReturned,
+ JDBCMessageRecoveryListener listener) throws Exception {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
s.setMaxRows(maxReturned);
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
- s.setLong(4,seq);
- rs=s.executeQuery();
- int count=0;
- if(statements.isUseExternalMessageReferences()){
- while(rs.next()&&count<maxReturned){
- if(listener.recoverMessageReference(rs.getString(1))){
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ s.setLong(4, seq);
+ rs = s.executeQuery();
+ int count = 0;
+ if (statements.isUseExternalMessageReferences()) {
+ while (rs.next() && count < maxReturned) {
+ if (listener.recoverMessageReference(rs.getString(1))) {
count++;
- }else{
+ } else {
break;
}
}
- }else{
- while(rs.next()&&count<maxReturned){
- if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+ } else {
+ while (rs.next() && count < maxReturned) {
+ if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
- }else{
+ } else {
break;
}
}
}
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- int result=0;
- try{
- s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
- rs=s.executeQuery();
- if(rs.next()){
- result=rs.getInt(1);
+ public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriptionName)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ int result = 0;
+ try {
+ s = c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ rs = s.executeQuery();
+ if (rs.next()) {
+ result = rs.getInt(1);
}
- }finally{
+ } finally {
close(rs);
close(s);
}
@@ -428,306 +439,326 @@
}
/**
- * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
- * org.apache.activemq.service.SubscriptionInfo)
+ * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection,
+ * java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
*/
- public void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo info,boolean retroactive) throws SQLException,IOException{
- // dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
- PreparedStatement s=null;
- try{
- long lastMessageId=-1;
- if(!retroactive){
- s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
- ResultSet rs=null;
- try{
- rs=s.executeQuery();
- if(rs.next()){
- lastMessageId=rs.getLong(1);
+ public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
+ throws SQLException, IOException {
+ // dumpTables(c, destination.getQualifiedName(), clientId,
+ // subscriptionName);
+ PreparedStatement s = null;
+ try {
+ long lastMessageId = -1;
+ if (!retroactive) {
+ s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+ ResultSet rs = null;
+ try {
+ rs = s.executeQuery();
+ if (rs.next()) {
+ lastMessageId = rs.getLong(1);
}
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
- s.setString(1,info.getDestination().getQualifiedName());
- s.setString(2,info.getClientId());
- s.setString(3,info.getSubscriptionName());
- s.setString(4,info.getSelector());
- s.setLong(5,lastMessageId);
+ s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
+ s.setString(1, info.getDestination().getQualifiedName());
+ s.setString(2, info.getClientId());
+ s.setString(3, info.getSubscriptionName());
+ s.setString(4, info.getSelector());
+ s.setLong(5, lastMessageId);
s.setString(6, info.getSubscribedDestination().getQualifiedName());
- if(s.executeUpdate()!=1){
- throw new IOException("Could not create durable subscription for: "+info.getClientId());
+ if (s.executeUpdate() != 1) {
+ throw new IOException("Could not create durable subscription for: " + info.getClientId());
}
- }finally{
+ } finally {
close(s);
}
}
- public SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
- rs=s.executeQuery();
- if(!rs.next()){
+ public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
+ String clientId, String subscriptionName)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
+ rs = s.executeQuery();
+ if (!rs.next()) {
return null;
}
- SubscriptionInfo subscription=new SubscriptionInfo();
+ SubscriptionInfo subscription = new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setClientId(clientId);
subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1));
- subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
+ subscription.setSubscribedDestination(ActiveMQDestination
+ .createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
return subscription;
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
- throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
- s.setString(1,destination.getQualifiedName());
- rs=s.executeQuery();
- ArrayList rc=new ArrayList();
- while(rs.next()){
- SubscriptionInfo subscription=new SubscriptionInfo();
+ public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
+ s.setString(1, destination.getQualifiedName());
+ rs = s.executeQuery();
+ ArrayList rc = new ArrayList();
+ while (rs.next()) {
+ SubscriptionInfo subscription = new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setSelector(rs.getString(1));
subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
- subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),ActiveMQDestination.QUEUE_TYPE));
+ subscription.setSubscribedDestination(ActiveMQDestination
+ .createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription);
}
return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);
- }finally{
+ } finally {
close(rs);
close(s);
}
}
- public void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName) throws SQLException,
- IOException{
- PreparedStatement s=null;
- try{
- s=c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
- s.setString(1,destinationName.getQualifiedName());
+ public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
+ s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
s.close();
- s=c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
- s.setString(1,destinationName.getQualifiedName());
+ s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
+ s.setString(1, destinationName.getQualifiedName());
s.executeUpdate();
- }finally{
+ } finally {
close(s);
}
}
- public void doDeleteSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
- String subscriptionName) throws SQLException,IOException{
- PreparedStatement s=null;
- try{
- s=c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriptionName);
+ public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
+ String subscriptionName) throws SQLException, IOException {
+ PreparedStatement s = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriptionName);
s.executeUpdate();
- }finally{
+ } finally {
close(s);
}
}
- public void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException{
- PreparedStatement s=null;
- try{
- log.debug("Executing SQL: "+statements.getDeleteOldMessagesStatement());
- s=c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
- s.setLong(1,System.currentTimeMillis());
- int i=s.executeUpdate();
- log.debug("Deleted "+i+" old message(s).");
- }finally{
+ public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
+ PreparedStatement s = null;
+ try {
+ log.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement());
+ s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
+ s.setLong(1, System.currentTimeMillis());
+ int i = s.executeUpdate();
+ log.debug("Deleted " + i + " old message(s).");
+ } finally {
close(s);
}
}
-
- public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
+
+ public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
+ ActiveMQDestination destination, String clientId,
+ String subscriberName) throws SQLException,
+ IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
long result = -1;
- try{
- s=c.getConnection().prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriberName);
- rs=s.executeQuery();
- if(rs.next()){
- result=rs.getLong(1);
+ try {
+ s = c.getConnection()
+ .prepareStatement(statements.getLastAckedDurableSubscriberMessageStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriberName);
+ rs = s.executeQuery();
+ if (rs.next()) {
+ result = rs.getLong(1);
}
rs.close();
s.close();
- }finally{
+ } finally {
close(rs);
close(s);
}
return result;
}
- static private void close(PreparedStatement s){
- try{
+ static private void close(PreparedStatement s) {
+ try {
s.close();
- }catch(Throwable e){
+ } catch (Throwable e) {
}
}
- static private void close(ResultSet rs){
- try{
+ static private void close(ResultSet rs) {
+ try {
rs.close();
- }catch(Throwable e){
+ } catch (Throwable e) {
}
}
- public Set doGetDestinations(TransactionContext c) throws SQLException,IOException{
- HashSet rc=new HashSet();
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
- rs=s.executeQuery();
- while(rs.next()){
- rc.add(ActiveMQDestination.createDestination(rs.getString(1),ActiveMQDestination.QUEUE_TYPE));
+ public Set doGetDestinations(TransactionContext c) throws SQLException, IOException {
+ HashSet rc = new HashSet();
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
+ rs = s.executeQuery();
+ while (rs.next()) {
+ rc
+ .add(ActiveMQDestination.createDestination(rs.getString(1),
+ ActiveMQDestination.QUEUE_TYPE));
}
- }finally{
+ } finally {
close(rs);
close(s);
}
return rc;
}
- public boolean isBatchStatments(){
+ public boolean isBatchStatments() {
return batchStatments;
}
- public void setBatchStatments(boolean batchStatments){
- this.batchStatments=batchStatments;
+ public void setBatchStatments(boolean batchStatments) {
+ this.batchStatments = batchStatments;
}
- public void setUseExternalMessageReferences(boolean useExternalMessageReferences){
+ public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
statements.setUseExternalMessageReferences(useExternalMessageReferences);
}
- public Statements getStatements(){
+ public Statements getStatements() {
return statements;
}
- public void setStatements(Statements statements){
- this.statements=statements;
+ public void setStatements(Statements statements) {
+ this.statements = statements;
}
- public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriberName) throws SQLException,IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
- s.setString(1,destination.getQualifiedName());
- s.setString(2,clientId);
- s.setString(3,subscriberName);
- rs=s.executeQuery();
- if(!rs.next()){
+ public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,
+ ActiveMQDestination destination,
+ String clientId, String subscriberName)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+ s.setString(1, destination.getQualifiedName());
+ s.setString(2, clientId);
+ s.setString(3, subscriberName);
+ rs = s.executeQuery();
+ if (!rs.next()) {
return null;
}
- return getBinaryData(rs,1);
- }finally{
+ return getBinaryData(rs, 1);
+ } finally {
close(rs);
close(s);
}
- }
-
- public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
- PreparedStatement s=null;
- ResultSet rs=null;
- int result=0;
- try{
- s=c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
- s.setString(1,destination.getQualifiedName());
- rs=s.executeQuery();
- if(rs.next()){
- result=rs.getInt(1);
+ }
+
+ public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
+ IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ int result = 0;
+ try {
+ s = c.getConnection().prepareStatement(statements.getDestinationMessageCountStatement());
+ s.setString(1, destination.getQualifiedName());
+ rs = s.executeQuery();
+ if (rs.next()) {
+ result = rs.getInt(1);
}
- }finally{
+ } finally {
close(rs);
close(s);
}
return result;
}
-
- public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,
- int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
+ public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
+ int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
s.setMaxRows(maxReturned);
- s.setString(1,destination.getQualifiedName());
- s.setLong(2,nextSeq);
- rs=s.executeQuery();
- int count=0;
- if(statements.isUseExternalMessageReferences()){
- while(rs.next()&&count<maxReturned){
- if(listener.recoverMessageReference(rs.getString(1))){
+ s.setString(1, destination.getQualifiedName());
+ s.setLong(2, nextSeq);
+ rs = s.executeQuery();
+ int count = 0;
+ if (statements.isUseExternalMessageReferences()) {
+ while (rs.next() && count < maxReturned) {
+ if (listener.recoverMessageReference(rs.getString(1))) {
count++;
- }else{
+ } else {
log.debug("Stopped recover next messages");
}
}
- }else{
- while(rs.next()&&count<maxReturned){
- if(listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2))){
+ } else {
+ while (rs.next() && count < maxReturned) {
+ if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
- }else{
+ } else {
log.debug("Stopped recover next messages");
}
}
}
- }catch(Exception e){
+ } catch (Exception e) {
e.printStackTrace();
- }finally{
+ } finally {
close(rs);
close(s);
}
}
/*
- * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
- * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
- * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
- * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
- * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
- * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
+ * Useful for debugging. public void dumpTables(Connection c, String
+ * destinationName, String clientId, String subscriptionName) throws
+ * SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
+ * printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
+ * PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID
+ * FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND
+ * D.CLIENT_ID=? AND D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID >
+ * D.LAST_ACKED_ID" +" ORDER BY M.ID"); s.setString(1,destinationName);
+ * s.setString(2,clientId); s.setString(3,subscriptionName);
* printQuery(s,System.out); }
*
- * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
- * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
+ * public void dumpTables(Connection c) throws SQLException { printQuery(c,
+ * "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from
+ * ACTIVEMQ_ACKS", System.out); }
*
- * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
- * printQuery(c.prepareStatement(query), out); }
+ * private void printQuery(Connection c, String query, PrintStream out)
+ * throws SQLException { printQuery(c.prepareStatement(query), out); }
*
- * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
+ * private void printQuery(PreparedStatement s, PrintStream out) throws
+ * SQLException {
*
- * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
- * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
- * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
- * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
- * try { s.close(); } catch (Throwable ignore) {} } }
+ * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData
+ * metaData = set.getMetaData(); for( int i=1; i<=
+ * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||");
+ * out.print(metaData.getColumnName(i)+"||"); } out.println();
+ * while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) {
+ * if(i==1) out.print("|"); out.print(set.getString(i)+"|"); }
+ * out.println(); } } finally { try { set.close(); } catch (Throwable
+ * ignore) {} try { s.close(); } catch (Throwable ignore) {} } }
*/
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java Wed Aug 8 11:56:59 2007
@@ -26,12 +26,12 @@
import org.apache.activemq.util.ByteArrayInputStream;
/**
- * This JDBCAdapter inserts and extracts BLOB data using the
+ * This JDBCAdapter inserts and extracts BLOB data using the
* setBinaryStream()/getBinaryStream() operations.
*
* The databases/JDBC drivers that use this adapter are:
* <ul>
- * <li>Axion</li>
+ * <li>Axion</li>
* </ul>
*
* @org.apache.xbean.XBean element="streamJDBCAdapter"
@@ -39,12 +39,13 @@
* @version $Revision: 1.2 $
*/
public class StreamJDBCAdapter extends DefaultJDBCAdapter {
-
+
/**
- * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, int)
+ * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
+ * int)
*/
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
-
+
try {
InputStream is = rs.getBinaryStream(index);
ByteArrayOutputStream os = new ByteArrayOutputStream(1024 * 4);
@@ -58,15 +59,16 @@
return os.toByteArray();
} catch (IOException e) {
- throw (SQLException)new SQLException("Error reading binary parameter: "+index).initCause(e);
+ throw (SQLException)new SQLException("Error reading binary parameter: " + index).initCause(e);
}
}
-
+
/**
- * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement, int, byte[])
+ * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
+ * int, byte[])
*/
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBinaryStream(index, new ByteArrayInputStream(data), data.length);
}
-
+
}