You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/12/01 18:20:29 UTC
svn commit: r885841 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
Author: dejanb
Date: Tue Dec 1 17:20:29 2009
New Revision: 885841
URL: http://svn.apache.org/viewvc?rev=885841&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2519 - duplicate messages and jdbc store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Dec 1 17:20:29 2009
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
@@ -25,24 +27,28 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.10 $
*/
public class JDBCMessageStore extends AbstractMessageStore {
+ private static final Log LOG = LogFactory.getLog(JDBCMessageStore.class);
protected final WireFormat wireFormat;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
+ protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, Long>();
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
super(destination);
@@ -53,22 +59,32 @@
public void addMessage(ConnectionContext context, Message message) throws IOException {
+ MessageId messageId = message.getMessageId();
+ Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
+ if (lastAddedMessage != null && lastAddedMessage >= messageId.getProducerSequenceId()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Message " + message + " already added to the database. Skipping.");
+ }
+ return;
+ }
+
// Serialize the Message..
byte data[];
try {
ByteSequence packet = wireFormat.marshal(message);
data = ByteSequenceData.toByteArray(packet);
} catch (IOException e) {
- throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
}
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
- adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration());
+ adapter.doAddMessage(c, messageId, destination, data, message.getExpiration());
+ addedMessages.put(messageId.getProducerId(), messageId.getProducerSequenceId());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
- throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
+ throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
} finally {
c.close();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Tue Dec 1 17:20:29 2009
@@ -20,6 +20,11 @@
import junit.framework.AssertionFailedError;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterTestSupport;
import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -42,14 +47,4 @@
return jdbc;
}
- @Override
- public void testStoreCanHandleDupMessages() throws Exception {
- try {
- super.testStoreCanHandleDupMessages();
- fail("We expect this test to fail as it would be too expensive to add additional " +
- "unique constraints in the JDBC implementation to detect the duplicate messages.");
- } catch (AssertionFailedError expected) {
- }
- }
-
}