You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/02 18:46:38 UTC

svn commit: r821090 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Author: chirino
Date: Fri Oct  2 16:46:37 2009
New Revision: 821090

URL: http://svn.apache.org/viewvc?rev=821090&view=rev
Log:
AMQ-2439: KahaDB + Network of Brokers + Restart = Duplicate Messages that cannot be removed from the data store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=821090&r1=821089&r2=821090&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Oct  2 16:46:37 2009
@@ -22,7 +22,17 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -49,13 +59,22 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.*;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
 
 public class MessageDatabase {
 
@@ -808,12 +827,24 @@
         long id = sd.nextMessageId++;
         Long previous = sd.locationIndex.put(tx, location, id);
         if( previous == null ) {
-            sd.messageIdIndex.put(tx, command.getMessageId(), id);
-            sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
+            if( previous == null ) {
+                sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+            } else {
+                // If the message ID as indexed, then the broker asked us to store a DUP
+                // message.  Bad BOY!  Don't do it, and log a warning.
+
+                LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId()+", on: "+command.getDestination());
+                
+                // TODO: consider just rolling back the tx.
+                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
+            }
         } else {
             // restore the previous value.. Looks like this was a redo of a previously
-            // added message.  We don't want to assing it a new id as the other indexes would 
+            // added message.  We don't want to assign it a new id as the other indexes would 
             // be wrong..
+            //
+            // TODO: consider just rolling back the tx.
             sd.locationIndex.put(tx, location, previous);
         }