You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/02 18:46:38 UTC
svn commit: r821090 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Author: chirino
Date: Fri Oct 2 16:46:37 2009
New Revision: 821090
URL: http://svn.apache.org/viewvc?rev=821090&view=rev
Log:
AMQ-2439: KahaDB + Network of Brokers + Restart = Duplicate Messages that cannot be removed from the data store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=821090&r1=821089&r2=821090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Oct 2 16:46:37 2009
@@ -22,7 +22,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,13 +59,22 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.*;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase {
@@ -808,12 +827,24 @@
long id = sd.nextMessageId++;
Long previous = sd.locationIndex.put(tx, location, id);
if( previous == null ) {
- sd.messageIdIndex.put(tx, command.getMessageId(), id);
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
+ if( previous == null ) {
+ sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ } else {
+ // If the message ID as indexed, then the broker asked us to store a DUP
+ // message. Bad BOY! Don't do it, and log a warning.
+
+ LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId()+", on: "+command.getDestination());
+
+ // TODO: consider just rolling back the tx.
+ sd.messageIdIndex.put(tx, command.getMessageId(), previous);
+ }
} else {
// restore the previous value.. Looks like this was a redo of a previously
- // added message. We don't want to assing it a new id as the other indexes would
+ // added message. We don't want to assign it a new id as the other indexes would
// be wrong..
+ //
+ // TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous);
}