You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/21 11:53:45 UTC

svn commit: r892759 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc: JDBCAdapter.java JDBCMessageIdScanListener.java JDBCMessageStore.java JDBCPersistenceAdapter.java Statements.java adapter/DefaultJDBCAdapter.java

Author: dejanb
Date: Mon Dec 21 10:53:44 2009
New Revision: 892759

URL: http://svn.apache.org/viewvc?rev=892759&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2473 - improving jdbc audit recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Dec 21 10:53:44 2009
@@ -82,5 +82,5 @@
 
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
 
-    void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
+    void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java Mon Dec 21 10:53:44 2009
@@ -19,5 +19,5 @@
 import org.apache.activemq.command.MessageId;
 
 public interface JDBCMessageIdScanListener {
-    boolean messageId(MessageId id);
+    void messageId(MessageId id);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Dec 21 10:53:44 2009
@@ -53,26 +53,6 @@
         this.adapter = adapter;
         this.wireFormat = wireFormat;
         this.audit = audit;
-        initAudit();
-    }
-
-    /*
-     * revisit: This can be destination agnostic and back in the jdbc persistence adapter start
-     */
-    public void initAudit() {
-        if (audit != null) {
-            try {
-                TransactionContext c = persistenceAdapter.getTransactionContext(null);
-                adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() {
-                    public boolean messageId(MessageId id) {
-                        audit.isDuplicate(id);
-                        return true;
-                    }
-                });
-            } catch (Exception e) {
-                LOG.error("Failed to reload store message audit for queue store " + destination);
-            }
-        }
     }
     
     public void addMessage(ConnectionContext context, Message message) throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Dec 21 10:53:44 2009
@@ -35,6 +35,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -90,6 +91,7 @@
     protected int maxProducersToAudit=1024;
     protected int maxAuditDepth=1000;
     protected boolean enableAudit=true;
+    protected int auditRecoveryDepth = 1024;
     protected ActiveMQMessageAudit audit;
 
     public JDBCPersistenceAdapter() {
@@ -126,15 +128,33 @@
         return Collections.EMPTY_SET;
     }
     
-    protected ActiveMQMessageAudit createMessageAudit() {
-    	if (enableAudit && audit == null) {
-    		audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+    protected void createMessageAudit() {
+        if (enableAudit && audit == null) {
+            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+            TransactionContext c = null;
+            
+            try {
+                c = getTransactionContext();
+                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
+                    public void messageId(MessageId id) {
+                        audit.isDuplicate(id);
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
+            } finally {
+                if (c != null) {
+                    try {
+                        c.close();
+                    } catch (Throwable e) {
+                    }
+                }
+            }
     	}
-    	return audit;
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
+        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -142,7 +162,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
+        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -234,6 +254,8 @@
                 }
             }, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
         }
+        
+        createMessageAudit();
     }
 
     public synchronized void stop() throws Exception {
@@ -625,6 +647,13 @@
 	public void setEnableAudit(boolean enableAudit) {
 		this.enableAudit = enableAudit;
 	}
-    
+
+    public int getAuditRecoveryDepth() {
+        return auditRecoveryDepth;
+    }
+
+    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
+        this.auditRecoveryDepth = auditRecoveryDepth;
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Dec 21 10:53:44 2009
@@ -152,7 +152,7 @@
         // and work back for X
         if (findAllMessageIdsStatement == null) {
             findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
-                                       + " WHERE CONTAINER=? ORDER BY ID DESC";
+                                       + " ORDER BY ID DESC";
         }
         return findAllMessageIdsStatement;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Dec 21 10:53:44 2009
@@ -327,20 +327,16 @@
         }
     }
 
-    public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, 
+    public void doMessageIdScan(TransactionContext c, int limit, 
             JDBCMessageIdScanListener listener) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
-            s.setString(1, destination.getQualifiedName());
-            // limit the query. just need the the last few messages that could be replayed 
-            // on recovery. send or commit reply lost so it gets replayed.
+            s.setMaxRows(limit);
             rs = s.executeQuery();
             while (rs.next()) {
-                if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) { 
-                    break;
-                }
+                listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)));
             }
         } finally {
             close(rs);