You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/10 11:05:21 UTC

svn commit: r889167 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ test/java/org/apache/activemq/store/ test/java/org/apache/activemq/store/jdbc/

Author: dejanb
Date: Thu Dec 10 10:05:11 2009
New Revision: 889167

URL: http://svn.apache.org/viewvc?rev=889167&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2519 - JDBC store and duplicate messages

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Dec 10 10:05:11 2009
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -48,22 +49,24 @@
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastMessageId = new AtomicLong(-1);
-    protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, Long>();
+    protected ActiveMQMessageAudit audit;
 
-    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
+    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
         super(destination);
         this.persistenceAdapter = persistenceAdapter;
         this.adapter = adapter;
         this.wireFormat = wireFormat;
+        this.audit = audit;
     }
 
     public void addMessage(ConnectionContext context, Message message) throws IOException {
 
         MessageId messageId = message.getMessageId();
-        Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
-        if (lastAddedMessage != null && lastAddedMessage >= messageId.getProducerSequenceId()) {
+        if (audit != null && audit.isDuplicate(message)) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Message " + message + " already added to the database. Skipping.");
+                LOG.debug(destination.getPhysicalName()
+                    + " ignoring duplicated (add) message, already stored: "
+                    + messageId);
             }
             return;
         }
@@ -81,7 +84,6 @@
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
             adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
-            addedMessages.put(messageId.getProducerId(), messageId.getProducerSequenceId());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Dec 10 10:05:11 2009
@@ -28,6 +28,7 @@
 
 import javax.sql.DataSource;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
@@ -85,6 +86,11 @@
     private boolean createTablesOnStartup = true;
     private DataSource lockDataSource;
     private int transactionIsolation;
+    
+    protected int maxProducersToAudit=1024;
+    protected int maxAuditDepth=1000;
+    protected boolean enableAudit=true;
+    protected ActiveMQMessageAudit audit;
 
     public JDBCPersistenceAdapter() {
     }
@@ -119,9 +125,16 @@
     private Set<ActiveMQDestination> emptyDestinationSet() {
         return Collections.EMPTY_SET;
     }
+    
+    protected ActiveMQMessageAudit createMessageAudit() {
+    	if (enableAudit && audit == null) {
+    		audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+    	}
+    	return audit;
+    }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination);
+        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -129,7 +142,7 @@
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination);
+        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, createMessageAudit());
         if (transactionStore != null) {
             rc = transactionStore.proxy(rc);
         }
@@ -588,4 +601,30 @@
     public void setTransactionIsolation(int transactionIsolation) {
         this.transactionIsolation = transactionIsolation;
     }
+
+	public int getMaxProducersToAudit() {
+		return maxProducersToAudit;
+	}
+
+	public void setMaxProducersToAudit(int maxProducersToAudit) {
+		this.maxProducersToAudit = maxProducersToAudit;
+	}
+
+	public int getMaxAuditDepth() {
+		return maxAuditDepth;
+	}
+
+	public void setMaxAuditDepth(int maxAuditDepth) {
+		this.maxAuditDepth = maxAuditDepth;
+	}
+
+	public boolean isEnableAudit() {
+		return enableAudit;
+	}
+
+	public void setEnableAudit(boolean enableAudit) {
+		this.enableAudit = enableAudit;
+	}
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Dec 10 10:05:11 2009
@@ -22,6 +22,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
@@ -40,8 +41,8 @@
 
     private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
 
-    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic) {
-        super(persistenceAdapter, adapter, wireFormat, topic);
+    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
+        super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java Thu Dec 10 10:05:11 2009
@@ -32,7 +32,7 @@
  */
 abstract public class PersistenceAdapterTestSupport extends TestCase {
 
-    private PersistenceAdapter pa;
+    protected PersistenceAdapter pa;
 
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=889167&r1=889166&r2=889167&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Thu Dec 10 10:05:11 2009
@@ -20,19 +20,10 @@
 
 import junit.framework.AssertionFailedError;
 
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterTestSupport;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
-/**
- * 
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
 public class JDBCPersistenceAdapterTest extends PersistenceAdapterTestSupport {
     
     protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
@@ -47,4 +38,18 @@
         return jdbc;
     }
     
+    public void testAuditOff() throws Exception {
+    	((JDBCPersistenceAdapter)pa).setEnableAudit(false);
+    	boolean failed = true;
+    	try {
+    		testStoreCanHandleDupMessages();
+    		failed = false;
+    	} catch (AssertionFailedError e) {
+    	}
+    	
+    	if (!failed) {
+    		fail("Should have failed with audit turned off");
+    	}
+    }
+    
 }