You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/01 18:20:29 UTC

svn commit: r885841 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java

Author: dejanb
Date: Tue Dec  1 17:20:29 2009
New Revision: 885841

URL: http://svn.apache.org/viewvc?rev=885841&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2519 - duplicate messages and jdbc store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Dec  1 17:20:29 2009
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.ConnectionContext;
@@ -25,24 +27,28 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.10 $
  */
 public class JDBCMessageStore extends AbstractMessageStore {
 
+    private static final Log LOG = LogFactory.getLog(JDBCMessageStore.class);
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastMessageId = new AtomicLong(-1);
+    protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, Long>();
 
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
         super(destination);
@@ -53,22 +59,32 @@
 
     public void addMessage(ConnectionContext context, Message message) throws IOException {
 
+        MessageId messageId = message.getMessageId();
+        Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
+        if (lastAddedMessage != null && lastAddedMessage >= messageId.getProducerSequenceId()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Message " + message + " already added to the database. Skipping.");
+            }
+            return;
+        }
+        
         // Serialize the Message..
         byte data[];
         try {
             ByteSequence packet = wireFormat.marshal(message);
             data = ByteSequenceData.toByteArray(packet);
         } catch (IOException e) {
-            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
+            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         }
 
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-            adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration());
+            adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
+            addedMessages.put(messageId.getProducerId(), messageId.getProducerSequenceId());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
-            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
+            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Tue Dec  1 17:20:29 2009
@@ -20,6 +20,11 @@
 
 import junit.framework.AssertionFailedError;
 
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterTestSupport;
 import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -42,14 +47,4 @@
         return jdbc;
     }
     
-    @Override
-    public void testStoreCanHandleDupMessages() throws Exception {
-        try {
-            super.testStoreCanHandleDupMessages();
-            fail("We expect this test to fail as it would be too expensive to add additional " +
-                 "unique constraints in the JDBC implementation to detect the duplicate messages.");
-        } catch (AssertionFailedError expected) {
-        }
-    }
-    
 }