You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/21 11:53:45 UTC
svn commit: r892759 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc:
JDBCAdapter.java JDBCMessageIdScanListener.java JDBCMessageStore.java
JDBCPersistenceAdapter.java Statements.java adapter/DefaultJDBCAdapter.java
Author: dejanb
Date: Mon Dec 21 10:53:44 2009
New Revision: 892759
URL: http://svn.apache.org/viewvc?rev=892759&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2473 - improving jdbc audit recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Dec 21 10:53:44 2009
@@ -82,5 +82,5 @@
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
- void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
+ void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageIdScanListener.java Mon Dec 21 10:53:44 2009
@@ -19,5 +19,5 @@
import org.apache.activemq.command.MessageId;
public interface JDBCMessageIdScanListener {
- boolean messageId(MessageId id);
+ void messageId(MessageId id);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Dec 21 10:53:44 2009
@@ -53,26 +53,6 @@
this.adapter = adapter;
this.wireFormat = wireFormat;
this.audit = audit;
- initAudit();
- }
-
- /*
- * revisit: This can be destination agnostic and back in the jdbc persistence adapter start
- */
- public void initAudit() {
- if (audit != null) {
- try {
- TransactionContext c = persistenceAdapter.getTransactionContext(null);
- adapter.doMessageIdScan(c, destination, 100, new JDBCMessageIdScanListener() {
- public boolean messageId(MessageId id) {
- audit.isDuplicate(id);
- return true;
- }
- });
- } catch (Exception e) {
- LOG.error("Failed to reload store message audit for queue store " + destination);
- }
- }
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Dec 21 10:53:44 2009
@@ -35,6 +35,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -90,6 +91,7 @@
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
+ protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit;
public JDBCPersistenceAdapter() {
@@ -126,15 +128,33 @@
return Collections.EMPTY_SET;
}
- protected ActiveMQMessageAudit createMessageAudit() {
- if (enableAudit && audit == null) {
- audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ protected void createMessageAudit() {
+ if (enableAudit && audit == null) {
+ audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ TransactionContext c = null;
+
+ try {
+ c = getTransactionContext();
+ getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
+ public void messageId(MessageId id) {
+ audit.isDuplicate(id);
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
+ } finally {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Throwable e) {
+ }
+ }
+ }
}
- return audit;
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
+ MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@@ -142,7 +162,7 @@
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
+ TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@@ -234,6 +254,8 @@
}
}, cleanupPeriod, cleanupPeriod, TimeUnit.MILLISECONDS);
}
+
+ createMessageAudit();
}
public synchronized void stop() throws Exception {
@@ -625,6 +647,13 @@
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
-
+
+ public int getAuditRecoveryDepth() {
+ return auditRecoveryDepth;
+ }
+
+ public void setAuditRecoveryDepth(int auditRecoveryDepth) {
+ this.auditRecoveryDepth = auditRecoveryDepth;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Dec 21 10:53:44 2009
@@ -152,7 +152,7 @@
// and work back for X
if (findAllMessageIdsStatement == null) {
findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
- + " WHERE CONTAINER=? ORDER BY ID DESC";
+ + " ORDER BY ID DESC";
}
return findAllMessageIdsStatement;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=892759&r1=892758&r2=892759&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Dec 21 10:53:44 2009
@@ -327,20 +327,16 @@
}
}
- public void doMessageIdScan(TransactionContext c, ActiveMQDestination destination, long limit,
+ public void doMessageIdScan(TransactionContext c, int limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
- s.setString(1, destination.getQualifiedName());
- // limit the query. just need the the last few messages that could be replayed
- // on recovery. send or commit reply lost so it gets replayed.
+ s.setMaxRows(limit);
rs = s.executeQuery();
while (rs.next()) {
- if (!listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)))) {
- break;
- }
+ listener.messageId(new MessageId(rs.getString(2), rs.getLong(3)));
}
} finally {
close(rs);