You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/04/20 13:52:51 UTC
svn commit: r1095376 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/
Author: gtully
Date: Wed Apr 20 11:52:50 2011
New Revision: 1095376
URL: http://svn.apache.org/viewvc?rev=1095376&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3289 - ActiveMQ has problems storing >4K messages in Oracle - brought BlobAdapter up to speed and added @Override to other adapters to catch this sort of problem in the future
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -30,6 +30,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class AxionJDBCAdapter extends StreamJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
String[] createStatements = new String[]{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -27,6 +27,8 @@ import java.sql.SQLException;
import javax.jms.JMSException;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.ByteArrayOutputStream;
@@ -49,30 +51,34 @@ import org.apache.activemq.util.ByteArra
*/
public class BlobJDBCAdapter extends DefaultJDBCAdapter {
- public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
- throws SQLException, JMSException {
+ @Override
+ public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
+ long expiration, byte priority) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
// Add the Blob record.
- s = c.prepareStatement(statements.getAddMessageStatement());
- s.setLong(1, seq);
- s.setString(2, destinationName);
- s.setString(3, messageID);
- s.setString(4, " ");
+ s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
+ s.setLong(1, sequence);
+ s.setString(2, messageID.getProducerId().toString());
+ s.setLong(3, messageID.getProducerSequenceId());
+ s.setString(4, destination.getQualifiedName());
+ s.setLong(5, expiration);
+ s.setLong(6, priority);
if (s.executeUpdate() != 1) {
- throw new JMSException("Failed to broker message: " + messageID + " in container.");
+ throw new IOException("Failed to add broker message: " + messageID + " in container.");
}
s.close();
// Select the blob record so that we can update it.
- s = c.prepareStatement(statements.getFindMessageStatement());
- s.setLong(1, seq);
+ s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement());
+ s.setLong(1, sequence);
rs = s.executeQuery();
if (!rs.next()) {
- throw new JMSException("Failed to broker message: " + messageID + " in container.");
+ throw new IOException("Failed select blob for message: " + messageID + " in container.");
}
// Update the blob
@@ -83,31 +89,27 @@ public class BlobJDBCAdapter extends Def
s.close();
// Update the row with the updated blob
- s = c.prepareStatement(statements.getUpdateMessageStatement());
+ s = c.getConnection().prepareStatement(statements.getUpdateMessageStatement());
s.setBlob(1, blob);
- s.setLong(2, seq);
+ s.setLong(2, sequence);
- } catch (IOException e) {
- throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
- try {
- rs.close();
- } catch (Throwable ignore) {
- }
- try {
- s.close();
- } catch (Throwable ignore) {
- }
+ cleanupExclusiveLock.readLock().unlock();
+ close(rs);
+ close(s);
}
}
- public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
+ @Override
+ public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
+ cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
- s.setLong(1, seq);
+ s.setString(1, id.getProducerId().toString());
+ s.setLong(2, id.getProducerSequenceId());
rs = s.executeQuery();
if (!rs.next()) {
@@ -126,17 +128,10 @@ public class BlobJDBCAdapter extends Def
return os.toByteArray();
- } catch (IOException e) {
- throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
} finally {
- try {
- rs.close();
- } catch (Throwable ignore) {
- }
- try {
- s.close();
- } catch (Throwable ignore) {
- }
+ cleanupExclusiveLock.readLock().unlock();
+ close(rs);
+ close(s);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -34,6 +34,7 @@ public class BytesJDBCAdapter extends De
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
* int)
*/
+ @Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
return rs.getBytes(index);
}
@@ -42,6 +43,7 @@ public class BytesJDBCAdapter extends De
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
* int, byte[])
*/
+ @Override
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBytes(index, data);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class DB2JDBCAdapter extends DefaultJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
String lockCreateStatement = "LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE";
statements.setLockCreateStatement(lockCreateStatement);
@@ -35,6 +36,7 @@ public class DB2JDBCAdapter extends Defa
super.setStatements(statements);
}
+ @Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
// Get as a BLOB
Blob aBlob = rs.getBlob(index);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -780,14 +780,14 @@ public class DefaultJDBCAdapter implemen
return result;
}
- private static void close(PreparedStatement s) {
+ protected static void close(PreparedStatement s) {
try {
s.close();
} catch (Throwable e) {
}
}
- private static void close(ResultSet rs) {
+ protected static void close(ResultSet rs) {
try {
rs.close();
} catch (Throwable e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class HsqldbJDBCAdapter extends BytesJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("OTHER");
super.setStatements(statements);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java Wed Apr 20 11:52:50 2011
@@ -32,6 +32,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class ImageBasedJDBCAdaptor extends DefaultJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("IMAGE");
super.setStatements(statements);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class InformixJDBCAdapter extends BlobJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
statements.setContainerNameDataType("VARCHAR(150)");
statements.setStringIdDataType("VARCHAR(150)");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.St
*/
public class MaxDBJDBCAdapter extends DefaultJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("LONG BYTE");
statements.setStringIdDataType("VARCHAR(250) ASCII");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -42,6 +42,7 @@ public class MySqlJDBCAdapter extends De
String engineType = INNODB;
String typeStatement = "ENGINE";
+ @Override
public void setStatements(Statements statements) {
String type = engineType.toUpperCase();
if( !type.equals(INNODB) && !type.equals(NDBCLUSTER) ) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -44,12 +44,14 @@ import org.apache.activemq.store.jdbc.St
*/
public class OracleJDBCAdapter extends BlobJDBCAdapter {
+ @Override
public void setStatements(Statements statements) {
statements.setLongDataType("NUMBER");
statements.setSequenceDataType("NUMBER");
super.setStatements(statements);
}
-
+
+ @Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
// Get as a BLOB
Blob aBlob = rs.getBlob(index);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -40,6 +40,7 @@ import org.apache.activemq.store.jdbc.St
public class PostgresqlJDBCAdapter extends BytesJDBCAdapter {
public String acksPkName = "activemq_acks_pkey";
+ @Override
public void setStatements(Statements statements) {
statements.setBinaryDataType("BYTEA");
statements.setDropAckPKAlterStatementEnd("DROP CONSTRAINT \"" + getAcksPkName() + "\"");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -44,6 +44,7 @@ public class StreamJDBCAdapter extends D
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
* int)
*/
+ @Override
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
try {
@@ -67,6 +68,7 @@ public class StreamJDBCAdapter extends D
* @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
* int, byte[])
*/
+ @Override
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBinaryStream(index, new ByteArrayInputStream(data), data.length);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -25,6 +25,7 @@ import org.apache.activemq.store.jdbc.St
*
*/
public class SybaseJDBCAdapter extends ImageBasedJDBCAdaptor {
+ @Override
public void setStatements(Statements statements) {
statements.setLockCreateStatement("LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -26,6 +26,7 @@ import org.apache.activemq.store.jdbc.St
*
*/
public class TransactJDBCAdapter extends ImageBasedJDBCAdaptor {
+ @Override
public void setStatements(Statements statements) {
String lockCreateStatement = "SELECT * FROM " + statements.getFullLockTableName();