You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/07 19:53:55 UTC
svn commit: r822813 - in /activemq/branches/activemq-5.3: ./
activemq-core/src/main/java/org/apache/activemq/store/kahadb/
activemq-core/src/main/java/org/apache/activemq/transport/stomp/
activemq-core/src/test/java/org/apache/activemq/store/ activemq-...
Author: chirino
Date: Wed Oct 7 17:53:55 2009
New Revision: 822813
URL: http://svn.apache.org/viewvc?rev=822813&view=rev
Log:
AMQ-2439: KahaDB + Network of Brokers + Restart = Duplicate Messages that cannot be removed from the data store
- merges in 821090 821103 821115
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
- copied, changed from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
- copied unchanged from r821115, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
- copied unchanged from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
Modified:
activemq/branches/activemq-5.3/ (props changed)
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (props changed)
Propchange: activemq/branches/activemq-5.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 7 17:53:55 2009
@@ -1 +1 @@
-/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764
+/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=822813&r1=822812&r2=822813&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Oct 7 17:53:55 2009
@@ -22,7 +22,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,13 +59,22 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.*;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase {
@@ -808,12 +827,22 @@
long id = sd.nextMessageId++;
Long previous = sd.locationIndex.put(tx, location, id);
if( previous == null ) {
- sd.messageIdIndex.put(tx, command.getMessageId(), id);
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
+ if( previous == null ) {
+ sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ } else {
+ // If the message ID as indexed, then the broker asked us to store a DUP
+ // message. Bad BOY! Don't do it, and log a warning.
+ LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
+ // TODO: consider just rolling back the tx.
+ sd.messageIdIndex.put(tx, command.getMessageId(), previous);
+ }
} else {
// restore the previous value.. Looks like this was a redo of a previously
- // added message. We don't want to assing it a new id as the other indexes would
+ // added message. We don't want to assign it a new id as the other indexes would
// be wrong..
+ //
+ // TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous);
}
Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 7 17:53:55 2009
@@ -1 +1 @@
-/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764
+/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115
Copied: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (from r821103, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java?p2=activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java&r1=821103&r2=822813&rev=822813&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java Wed Oct 7 17:53:55 2009
@@ -54,15 +54,21 @@
MessageStore ms = pa.createQueueMessageStore(new ActiveMQQueue("TEST"));
+ ConnectionContext context = new ConnectionContext();
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("test");
- message.setMessageId(new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1"));
- ConnectionContext context = new ConnectionContext();
-
+ MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1");
+ messageId.setBrokerSequenceId(1);
+ message.setMessageId(messageId);
ms.addMessage(context, message);
// here comes the dup...
+ message = new ActiveMQTextMessage();
+ message.setText("test");
+ messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:1");
+ messageId.setBrokerSequenceId(2);
+ message.setMessageId(messageId);
ms.addMessage(context, message);
final AtomicInteger recovered = new AtomicInteger();
@@ -85,7 +91,6 @@
return true;
}
});
-
assertEquals(1, recovered.get());
}