You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/10 11:05:21 UTC
svn commit: r889167 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/
test/java/org/apache/activemq/store/
test/java/org/apache/activemq/store/jdbc/
Author: dejanb
Date: Thu Dec 10 10:05:11 2009
New Revision: 889167
URL: http://svn.apache.org/viewvc?rev=889167&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2519 - JDBC store and duplicate messages
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Dec 10 10:05:11 2009
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -48,22 +49,24 @@
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
- protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, Long>();
+ protected ActiveMQMessageAudit audit;
- public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
+ public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
super(destination);
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
+ this.audit = audit;
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
MessageId messageId = message.getMessageId();
- Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
- if (lastAddedMessage != null && lastAddedMessage >= messageId.getProducerSequenceId()) {
+ if (audit != null && audit.isDuplicate(message)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Message " + message + " already added to the database. Skipping.");
+ LOG.debug(destination.getPhysicalName()
+ + " ignoring duplicated (add) message, already stored: "
+ + messageId);
}
return;
}
@@ -81,7 +84,6 @@
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
- addedMessages.put(messageId.getProducerId(), messageId.getProducerSequenceId());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Dec 10 10:05:11 2009
@@ -28,6 +28,7 @@
import javax.sql.DataSource;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@@ -85,6 +86,11 @@
private boolean createTablesOnStartup = true;
private DataSource lockDataSource;
private int transactionIsolation;
+
+ protected int maxProducersToAudit=1024;
+ protected int maxAuditDepth=1000;
+ protected boolean enableAudit=true;
+ protected ActiveMQMessageAudit audit;
public JDBCPersistenceAdapter() {
}
@@ -119,9 +125,16 @@
private Set<ActiveMQDestination> emptyDestinationSet() {
return Collections.EMPTY_SET;
}
+
+ protected ActiveMQMessageAudit createMessageAudit() {
+ if (enableAudit && audit == null) {
+ audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ }
+ return audit;
+ }
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination);
+ MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@@ -129,7 +142,7 @@
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination);
+ TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
if (transactionStore != null) {
rc = transactionStore.proxy(rc);
}
@@ -588,4 +601,30 @@
public void setTransactionIsolation(int transactionIsolation) {
this.transactionIsolation = transactionIsolation;
}
+
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
+ }
+
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Dec 10 10:05:11 2009
@@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
@@ -40,8 +41,8 @@
private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
- public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic) {
- super(persistenceAdapter, adapter, wireFormat, topic);
+ public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
+ super(persistenceAdapter, adapter, wireFormat, topic, audit);
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java Thu Dec 10 10:05:11 2009
@@ -32,7 +32,7 @@
*/
abstract public class PersistenceAdapterTestSupport extends TestCase {
- private PersistenceAdapter pa;
+ protected PersistenceAdapter pa;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Thu Dec 10 10:05:11 2009
@@ -20,19 +20,10 @@
import junit.framework.AssertionFailedError;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport;
import org.apache.derby.jdbc.EmbeddedDataSource;
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
@@ -47,4 +38,18 @@
return jdbc;
}
+ public void testAuditOff() throws Exception {
+ ((JDBCPersistenceAdapter)pa).setEnableAudit(false);
+ boolean failed = true;
+ try {
+ testStoreCanHandleDupMessages();
+ failed = false;
+ } catch (AssertionFailedError e) {
+ }
+
+ if (!failed) {
+ fail("Should have failed with audit turned off");
+ }
+ }
+
}