You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/12 20:31:52 UTC

svn commit: r694770 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store: KahaDBPersistenceAdaptor.java MessageDatabase.java

Author: chirino
Date: Fri Sep 12 11:31:51 2008
New Revision: 694770

URL: http://svn.apache.org/viewvc?rev=694770&view=rev
Log:
Added a new index to order messages by a sequence id instead of a the Location.
The problem was that messages in a transaction have a location id the comes earlier
than messages not in a transaction, but those transacted messages should be recovered
after those messages.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=694770&r1=694769&r2=694770&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Fri Sep 12 11:31:51 2008
@@ -50,6 +50,7 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.store.MessageDatabase.MessageKeys;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -168,7 +169,11 @@
                 location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
                     public Location execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        return sd.messageIdIndex.get(tx, key);
+                        Long sequence = sd.messageIdIndex.get(tx, key);
+                        if( sequence ==null ) {
+                            return null;
+                        }
+                        return sd.orderIndex.get(tx, sequence).location;
                     }
                 });
             }
@@ -186,7 +191,7 @@
                         // Iterate through all index entries to get a count of messages in the destination.
                         StoredDestination sd = getStoredDestination(dest, tx);
                         int rc=0;
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
                             iterator.next();
                             rc++;
                         }
@@ -201,37 +206,34 @@
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
-                            Entry<Location, String> entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location) );
                         }
                     }
                 });
             }
         }
 
-        Location cursorPos=null;
+        long cursorPos=0;
         
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             synchronized(indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Entry<Location, String> entry=null;
+                        Entry<Long, MessageKeys> entry=null;
                         int counter = 0;
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
                             counter++;
                             if( counter >= maxReturned ) {
                                 break;
                             }
                         }
                         if( entry!=null ) {
-                            // Copy the location, cause the iterator gives us the key by reference.. changing it
-                            // would mess up the index.
-                            cursorPos = new Location(entry.getKey());
-                            cursorPos.setOffset(entry.getKey().getOffset()+1 );
+                            cursorPos = entry.getKey()+1;
                         }
                     }
                 });
@@ -239,7 +241,7 @@
         }
 
         public void resetBatching() {
-            cursorPos=null;
+            cursorPos=0;
         }
 
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
@@ -330,16 +332,15 @@
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Location cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         if ( cursorPos==null ) {
                             // The subscription might not exist.
                             return 0;
                         }
-                        cursorPos = new Location(cursorPos);
-                        cursorPos.setOffset(cursorPos.getOffset()+1 );
+                        cursorPos += 1;
                         
                         int counter = 0;
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             iterator.next();
                             counter++;
                         }
@@ -355,12 +356,12 @@
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Location cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
-                        cursorPos.setOffset(cursorPos.getOffset()+1 );
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        cursorPos += 1;
                         
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
-                            Entry<Location, String> entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
                         }
                     }
                 });
@@ -373,28 +374,24 @@
                 pageFile.tx().execute(new Transaction.Closure<Exception>(){
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Location cursorPos = sd.subscriptionCursors.get(subscriptionKey);
+                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
                         if( cursorPos == null ) {
-                            cursorPos = new Location(sd.subscriptionAcks.get(tx, subscriptionKey));
-                            cursorPos.setOffset(cursorPos.getOffset()+1 );
+                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            cursorPos += 1;
                         }
                         
-                        Entry<Location, String> entry=null;
+                        Entry<Long, MessageKeys> entry=null;
                         int counter = 0;
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
                             entry = iterator.next();
-                            listener.recoverMessage( loadMessage(entry.getKey() ) );
+                            listener.recoverMessage( loadMessage(entry.getValue().location ) );
                             counter++;
                             if( counter >= maxReturned ) {
                                 break;
                             }
                         }
                         if( entry!=null ) {
-                            // Copy the location, cause the iterator gives us the key by reference.. changing it
-                            // would mess up the index.
-                            cursorPos = new Location(entry.getKey());
-                            cursorPos.setOffset(entry.getKey().getOffset()+1 );
-                            sd.subscriptionCursors.put(subscriptionKey, cursorPos);
+                            sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
                         }
                     }
                 });

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694770&r1=694769&r2=694770&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Sep 12 11:31:51 2008
@@ -41,6 +41,7 @@
 import org.apache.activemq.command.XATransactionId;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.LongMarshaller;
 import org.apache.kahadb.Marshaller;
 import org.apache.kahadb.StringMarshaller;
 import org.apache.kahadb.index.BTreeIndex;
@@ -572,38 +573,43 @@
 
         // Skip adding the message to the index if this is a topic and there are
         // no subscriptions.
-        if (sd.subscriptions != null && sd.ackLocations.isEmpty()) {
+        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
             return;
         }
 
         // Add the message.
-        sd.orderIndex.put(tx, location, command.getMessageId());
-        sd.messageIdIndex.put(tx, command.getMessageId(), location);
+        long id = sd.nextMessageId++;
+        sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+        sd.locationIndex.put(tx, location, id);
+        sd.messageIdIndex.put(tx, command.getMessageId(), id);
     }
 
     private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {
             // In the queue case we just remove the message from the index..
-            Location messageLocation = sd.messageIdIndex.remove(tx, command.getMessageId());
-            if (messageLocation != null) {
-                sd.orderIndex.remove(tx, messageLocation);
+            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
+            if (sequenceId != null) {
+                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+                if( keys!=null ) {
+                    sd.locationIndex.remove(tx, keys.location);
+                }
             }
         } else {
             // In the topic case we need remove the message once it's been acked
             // by all the subs
-            Location messageLocation = sd.messageIdIndex.get(tx, command.getMessageId());
+            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
 
             // Make sure it's a valid message id...
-            if (messageLocation != null) {
+            if (sequence != null) {
                 String subscriptionKey = command.getSubscriptionKey();
-                Location prev = sd.subscriptionAcks.put(tx, subscriptionKey, messageLocation);
+                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
 
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, prev);
 
                 // Add it to the new location set.
-                addAckLocation(sd, messageLocation, subscriptionKey);
+                addAckLocation(sd, sequence, subscriptionKey);
             }
 
         }
@@ -612,18 +618,24 @@
     private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.clear(tx);
-        sd.messageIdIndex.clear(tx);
         sd.orderIndex.unload(tx);
-        sd.messageIdIndex.unload(tx);
         tx.free(sd.orderIndex.getPageId());
+        
+        sd.locationIndex.clear(tx);
+        sd.locationIndex.unload(tx);
+        tx.free(sd.locationIndex.getPageId());
+
+        sd.messageIdIndex.clear(tx);
+        sd.messageIdIndex.unload(tx);
         tx.free(sd.messageIdIndex.getPageId());
 
         if (sd.subscriptions != null) {
             sd.subscriptions.clear(tx);
-            sd.subscriptionAcks.clear(tx);
             sd.subscriptions.unload(tx);
-            sd.subscriptionAcks.unload(tx);
             tx.free(sd.subscriptions.getPageId());
+
+            sd.subscriptionAcks.clear(tx);
+            sd.subscriptionAcks.unload(tx);
             tx.free(sd.subscriptionAcks.getPageId());
         }
 
@@ -639,11 +651,9 @@
         if (command.hasSubscriptionInfo()) {
             String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.put(tx, subscriptionKey, command);
-            Location ackLocation;
-            if (command.getRetroactive()) {
-                ackLocation = new Location(0, 0);
-            } else {
-                ackLocation = location;
+            long ackLocation=-1;
+            if (!command.getRetroactive()) {
+                ackLocation = sd.nextMessageId-1;
             }
 
             sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
@@ -652,8 +662,10 @@
             // delete the sub...
             String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.remove(tx, subscriptionKey);
-            Location prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
-            removeAckLocation(tx, sd, subscriptionKey, prev);
+            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
+            if( prev!=null ) {
+                removeAckLocation(tx, sd, subscriptionKey, prev);
+            }
         }
 
     }
@@ -672,7 +684,7 @@
         
         for (StoredDestination sd : storedDestinations.values()) {
             // Use a visitor to cut down the number of pages that we load
-            sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
+            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                 int last=-1;
                 public boolean isInterestedInKeysBetween(Location first, Location second) {
                     if( second!=null ) {
@@ -687,7 +699,7 @@
                     return true;
                 }
 
-                public void visit(List<Location> keys, List<String> values) {
+                public void visit(List<Location> keys, List<Long> values) {
                     for (int i = 0; i < keys.size(); i++) {
                         if( last != keys.get(i).getDataFileId() ) {
                             inUseFiles.add(keys.get(i).getDataFileId());
@@ -730,16 +742,45 @@
         Location lastAckLocation;
         Location cursor;
     }
+    
+    static class MessageKeys {
+        final String messageId;
+        final Location location;
+        
+        public MessageKeys(String messageId, Location location) {
+            this.messageId=messageId;
+            this.location=location;
+        }
+    }
+    
+    static protected class MessageKeysMarshaller implements Marshaller<MessageKeys> {
+        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
+        
+        public Class<MessageKeys> getType() {
+            return MessageKeys.class;
+        }
 
+        public MessageKeys readPayload(DataInput dataIn) throws IOException {
+            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+        }
+
+        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
+            dataOut.writeUTF(object.messageId);
+            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+        }
+    }
+    
     static class StoredDestination {
-        BTreeIndex<Location, String> orderIndex;
-        BTreeIndex<String, Location> messageIdIndex;
+        long nextMessageId;
+        BTreeIndex<Long, MessageKeys> orderIndex;
+        BTreeIndex<Location, Long> locationIndex;
+        BTreeIndex<String, Long> messageIdIndex;
 
         // These bits are only set for Topics
         BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
-        BTreeIndex<String, Location> subscriptionAcks;
-        HashMap<String, Location> subscriptionCursors;
-        TreeMap<Location, HashSet<String>> ackLocations;
+        BTreeIndex<String, Long> subscriptionAcks;
+        HashMap<String, Long> subscriptionCursors;
+        TreeMap<Long, HashSet<String>> ackPositions;
     }
 
     protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
@@ -749,18 +790,20 @@
 
         public StoredDestination readPayload(DataInput dataIn) throws IOException {
             StoredDestination value = new StoredDestination();
-            value.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
-            value.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+            value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
+            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
 
             if (dataIn.readBoolean()) {
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
-                value.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
             }
             return value;
         }
 
         public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
             dataOut.writeLong(value.orderIndex.getPageId());
+            dataOut.writeLong(value.locationIndex.getPageId());
             dataOut.writeLong(value.messageIdIndex.getPageId());
             if (value.subscriptions != null) {
                 dataOut.writeBoolean(true);
@@ -841,25 +884,36 @@
         if (rc == null) {
             // Brand new destination.. allocate indexes for it.
             rc = new StoredDestination();
-            rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
-            rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+            rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
+            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
 
             if (topic) {
                 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
-                rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
         }
 
         // Configure the marshalers and load.
-        rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
-        rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
+        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
         rc.orderIndex.load(tx);
 
+        // Figure out the next key using the last entry in the destination.
+        Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
+        if( lastEntry!=null ) {
+            rc.nextMessageId = lastEntry.getKey()+1;
+        }
+
+        rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        rc.locationIndex.load(tx);
+
         rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
-        rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
+        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
         rc.messageIdIndex.load(tx);
-
+        
         // If it was a topic...
         if (topic) {
 
@@ -868,14 +922,14 @@
             rc.subscriptions.load(tx);
 
             rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
-            rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
             rc.subscriptionAcks.load(tx);
 
-            rc.ackLocations = new TreeMap<Location, HashSet<String>>();
-            rc.subscriptionCursors = new HashMap<String, Location>();
+            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
+            rc.subscriptionCursors = new HashMap<String, Long>();
 
-            for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
-                Entry<String, Location> entry = iterator.next();
+            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+                Entry<String, Long> entry = iterator.next();
                 addAckLocation(rc, entry.getValue(), entry.getKey());
             }
 
@@ -885,14 +939,14 @@
 
     /**
      * @param sd
-     * @param messageLocation
+     * @param messageSequence
      * @param subscriptionKey
      */
-    private void addAckLocation(StoredDestination sd, Location messageLocation, String subscriptionKey) {
-        HashSet<String> hs = sd.ackLocations.get(messageLocation);
+    private void addAckLocation(StoredDestination sd, Long messageSequence, String subscriptionKey) {
+        HashSet<String> hs = sd.ackPositions.get(messageSequence);
         if (hs == null) {
             hs = new HashSet<String>();
-            sd.ackLocations.put(messageLocation, hs);
+            sd.ackPositions.put(messageSequence, hs);
         }
         hs.add(subscriptionKey);
     }
@@ -901,18 +955,18 @@
      * @param tx
      * @param sd
      * @param subscriptionKey
-     * @param location
+     * @param sequenceId
      * @throws IOException
      */
-    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Location location) throws IOException {
+    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
         // Remove the sub from the previous location set..
-        if (location != null) {
-            HashSet<String> hs = sd.ackLocations.get(location);
+        if (sequenceId != null) {
+            HashSet<String> hs = sd.ackPositions.get(sequenceId);
             if (hs != null) {
                 hs.remove(subscriptionKey);
                 if (hs.isEmpty()) {
-                    HashSet<String> firstSet = sd.ackLocations.values().iterator().next();
-                    sd.ackLocations.remove(location);
+                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
+                    sd.ackPositions.remove(sequenceId);
 
                     // Did we just empty out the first set in the
                     // ordered list of ack locations? Then it's time to
@@ -920,10 +974,10 @@
                     if (hs == firstSet) {
 
                         // Find all the entries that need to get deleted.
-                        ArrayList<Entry<Location, String>> deletes = new ArrayList<Entry<Location, String>>();
-                        for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
-                            Entry<Location, String> entry = iterator.next();
-                            while (entry.getKey().compareTo(location) <= 0) {
+                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            while (entry.getKey().compareTo(sequenceId) <= 0) {
                                 // We don't do the actually delete while we are
                                 // iterating the BTree since
                                 // iterating would fail.
@@ -932,11 +986,11 @@
                         }
 
                         // Do the actual deletes.
-                        for (Entry<Location, String> entry : deletes) {
-                            sd.messageIdIndex.remove(tx, entry.getValue());
-                            sd.orderIndex.remove(tx, entry.getKey());
+                        for (Entry<Long, MessageKeys> entry : deletes) {
+                            sd.locationIndex.remove(tx, entry.getValue().location);
+                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
+                            sd.orderIndex.remove(tx,entry.getKey());
                         }
-
                     }
                 }
             }