You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/04/20 13:52:51 UTC

svn commit: r1095376 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/

Author: gtully
Date: Wed Apr 20 11:52:50 2011
New Revision: 1095376

URL: http://svn.apache.org/viewvc?rev=1095376&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3289 - ActiveMQ has problems storing >4K messages in Oracle - brought BlobAdapter up to speed and added @Override to other adapters to catch this sort of problem in the future

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -30,6 +30,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class AxionJDBCAdapter extends StreamJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         
         String[] createStatements = new String[]{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -27,6 +27,8 @@ import java.sql.SQLException;
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.jdbc.TransactionContext;
 import org.apache.activemq.util.ByteArrayOutputStream;
 
@@ -49,30 +51,34 @@ import org.apache.activemq.util.ByteArra
  */
 public class BlobJDBCAdapter extends DefaultJDBCAdapter {
 
-    public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data)
-        throws SQLException, JMSException {
+    @Override
+    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
+            long expiration, byte priority) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
 
             // Add the Blob record.
-            s = c.prepareStatement(statements.getAddMessageStatement());
-            s.setLong(1, seq);
-            s.setString(2, destinationName);
-            s.setString(3, messageID);
-            s.setString(4, " ");
+            s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
+            s.setLong(1, sequence);
+            s.setString(2, messageID.getProducerId().toString());
+            s.setLong(3, messageID.getProducerSequenceId());
+            s.setString(4, destination.getQualifiedName());
+            s.setLong(5, expiration);
+            s.setLong(6, priority);
 
             if (s.executeUpdate() != 1) {
-                throw new JMSException("Failed to broker message: " + messageID + " in container.");
+                throw new IOException("Failed to add broker message: " + messageID + " in container.");
             }
             s.close();
 
             // Select the blob record so that we can update it.
-            s = c.prepareStatement(statements.getFindMessageStatement());
-            s.setLong(1, seq);
+            s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement());
+            s.setLong(1, sequence);
             rs = s.executeQuery();
             if (!rs.next()) {
-                throw new JMSException("Failed to broker message: " + messageID + " in container.");
+                throw new IOException("Failed select blob for message: " + messageID + " in container.");
             }
 
             // Update the blob
@@ -83,31 +89,27 @@ public class BlobJDBCAdapter extends Def
             s.close();
 
             // Update the row with the updated blob
-            s = c.prepareStatement(statements.getUpdateMessageStatement());
+            s = c.getConnection().prepareStatement(statements.getUpdateMessageStatement());
             s.setBlob(1, blob);
-            s.setLong(2, seq);
+            s.setLong(2, sequence);
 
-        } catch (IOException e) {
-            throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
         } finally {
-            try {
-                rs.close();
-            } catch (Throwable ignore) {
-            }
-            try {
-                s.close();
-            } catch (Throwable ignore) {
-            }
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
         }
     }
 
-    public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException {
+    @Override
+    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
         try {
 
             s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
-            s.setLong(1, seq);
+            s.setString(1, id.getProducerId().toString());
+            s.setLong(2, id.getProducerSequenceId());
             rs = s.executeQuery();
 
             if (!rs.next()) {
@@ -126,17 +128,10 @@ public class BlobJDBCAdapter extends Def
 
             return os.toByteArray();
 
-        } catch (IOException e) {
-            throw (SQLException)new SQLException("BLOB could not be updated: " + e).initCause(e);
         } finally {
-            try {
-                rs.close();
-            } catch (Throwable ignore) {
-            }
-            try {
-                s.close();
-            } catch (Throwable ignore) {
-            }
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -34,6 +34,7 @@ public class BytesJDBCAdapter extends De
      * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
      *      int)
      */
+    @Override
     protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
         return rs.getBytes(index);
     }
@@ -42,6 +43,7 @@ public class BytesJDBCAdapter extends De
      * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
      *      int, byte[])
      */
+    @Override
     protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
         s.setBytes(index, data);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DB2JDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class DB2JDBCAdapter extends DefaultJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         String lockCreateStatement = "LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE";
         statements.setLockCreateStatement(lockCreateStatement);
@@ -35,6 +36,7 @@ public class DB2JDBCAdapter extends Defa
         super.setStatements(statements);
     }
 
+    @Override
     protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
         // Get as a BLOB
         Blob aBlob = rs.getBlob(index);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -780,14 +780,14 @@ public class DefaultJDBCAdapter implemen
         return result;
     }
 
-    private static void close(PreparedStatement s) {
+    protected static void close(PreparedStatement s) {
         try {
             s.close();
         } catch (Throwable e) {
         }
     }
 
-    private static void close(ResultSet rs) {
+    protected static void close(ResultSet rs) {
         try {
             rs.close();
         } catch (Throwable e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class HsqldbJDBCAdapter extends BytesJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setBinaryDataType("OTHER");
         super.setStatements(statements);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ImageBasedJDBCAdaptor.java Wed Apr 20 11:52:50 2011
@@ -32,6 +32,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class ImageBasedJDBCAdaptor extends DefaultJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setBinaryDataType("IMAGE");
         super.setStatements(statements);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/InformixJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class InformixJDBCAdapter extends BlobJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setContainerNameDataType("VARCHAR(150)");
         statements.setStringIdDataType("VARCHAR(150)");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MaxDBJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -24,6 +24,7 @@ import org.apache.activemq.store.jdbc.St
  */
 public class MaxDBJDBCAdapter extends DefaultJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setBinaryDataType("LONG BYTE");
         statements.setStringIdDataType("VARCHAR(250) ASCII");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/MySqlJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -42,6 +42,7 @@ public class MySqlJDBCAdapter extends De
     String engineType = INNODB;
     String typeStatement = "ENGINE";
 
+    @Override
     public void setStatements(Statements statements) {
         String type = engineType.toUpperCase();
         if( !type.equals(INNODB) &&  !type.equals(NDBCLUSTER) ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -44,12 +44,14 @@ import org.apache.activemq.store.jdbc.St
  */
 public class OracleJDBCAdapter extends BlobJDBCAdapter {
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setLongDataType("NUMBER");
         statements.setSequenceDataType("NUMBER");
         super.setStatements(statements);
     }
-    
+
+    @Override
     protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
         // Get as a BLOB
         Blob aBlob = rs.getBlob(index);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/PostgresqlJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -40,6 +40,7 @@ import org.apache.activemq.store.jdbc.St
 public class PostgresqlJDBCAdapter extends BytesJDBCAdapter {
     public String acksPkName = "activemq_acks_pkey";
 
+    @Override
     public void setStatements(Statements statements) {
         statements.setBinaryDataType("BYTEA");
         statements.setDropAckPKAlterStatementEnd("DROP CONSTRAINT \"" + getAcksPkName() + "\"");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/StreamJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -44,6 +44,7 @@ public class StreamJDBCAdapter extends D
      * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
      *      int)
      */
+    @Override
     protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
 
         try {
@@ -67,6 +68,7 @@ public class StreamJDBCAdapter extends D
      * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
      *      int, byte[])
      */
+    @Override
     protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
         s.setBinaryStream(index, new ByteArrayInputStream(data), data.length);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/SybaseJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -25,6 +25,7 @@ import org.apache.activemq.store.jdbc.St
  * 
  */
 public class SybaseJDBCAdapter extends ImageBasedJDBCAdaptor {
+    @Override
     public void setStatements(Statements statements) {
         statements.setLockCreateStatement("LOCK TABLE " + statements.getFullLockTableName() + " IN EXCLUSIVE MODE");
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java?rev=1095376&r1=1095375&r2=1095376&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactJDBCAdapter.java Wed Apr 20 11:52:50 2011
@@ -26,6 +26,7 @@ import org.apache.activemq.store.jdbc.St
  * 
  */
 public class TransactJDBCAdapter extends ImageBasedJDBCAdaptor {
+    @Override
     public void setStatements(Statements statements) {
         String lockCreateStatement = "SELECT * FROM " + statements.getFullLockTableName();