You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/29 16:52:04 UTC

svn commit: r980458 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ main/proto/ test/java/org/apache/activemq/store/kahadb/

Author: rajdavies
Date: Thu Jul 29 14:52:03 2010
New Revision: 980458

URL: http://svn.apache.org/viewvc?rev=980458&view=rev
Log:
changes for https://issues.apache.org/activemq/browse/AMQ-2789 - message priority

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/proto/journal-data.proto
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Jul 29 14:52:03 2010
@@ -98,7 +98,7 @@ public class KahaDBStore extends Message
     protected ExecutorService topicExecutor;
     protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
     protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
-    private final WireFormat wireFormat = new OpenWireFormat();
+    final WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
     private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
     private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
@@ -368,7 +368,8 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toString());
             command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
-
+            command.setPriority(message.getPriority());
+            command.setPrioritySupported(isPrioritizedMessages());
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
             store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
@@ -472,10 +473,12 @@ public class KahaDBStore extends Message
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
+                        sd.orderIndex.resetCursorPosition();
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            listener.recoverMessage(loadMessage(entry.getValue().location));
+                            Message msg = loadMessage(entry.getValue().location);
+                            listener.recoverMessage(msg);
                         }
                     }
                 });
@@ -484,8 +487,7 @@ public class KahaDBStore extends Message
             }
         }
 
-        long cursorPos = 0;
-
+        
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             indexLock.readLock().lock();
             try {
@@ -494,19 +496,19 @@ public class KahaDBStore extends Message
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext()
                                 && listener.hasSpace();) {
                             entry = iterator.next();
-                            listener.recoverMessage(loadMessage(entry.getValue().location));
+                            Message msg = loadMessage(entry.getValue().location);
+                            //System.err.println("RECOVER " + msg.getMessageId().getProducerSequenceId());
+                            listener.recoverMessage(msg);
                             counter++;
                             if (counter >= maxReturned || listener.hasSpace() == false) {
                                 break;
                             }
                         }
-                        if (entry != null) {
-                            cursorPos = entry.getKey() + 1;
-                        }
+                        sd.orderIndex.stoppedIterating();
                     }
                 });
             }finally {
@@ -515,7 +517,15 @@ public class KahaDBStore extends Message
         }
 
         public void resetBatching() {
-            cursorPos = 0;
+            try {
+                pageFile.tx().execute(new Transaction.Closure<Exception>() {
+                    public void execute(Transaction tx) throws Exception {
+                StoredDestination sd = getStoredDestination(dest, tx);
+                sd.orderIndex.resetCursorPosition();}
+                    });
+            } catch (Exception e) {
+                LOG.error("Failed to reset batching",e);
+            }
         }
 
         @Override
@@ -527,21 +537,22 @@ public class KahaDBStore extends Message
                 // Hopefully one day the page file supports concurrent read
                 // operations... but for now we must
                 // externally synchronize...
-                Long location;
+               
                 indexLock.readLock().lock();
                 try {
-                    location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
-                        public Long execute(Transaction tx) throws IOException {
+                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
-                            return sd.messageIdIndex.get(tx, key);
+                            Long location = sd.messageIdIndex.get(tx, key);
+                            if (location != null) {
+                                sd.orderIndex.setBatch(tx, location);
+                            }
                         }
                     });
                 }finally {
                     indexLock.readLock().unlock();
                 }
-                if (location != null) {
-                    cursorPos = location + 1;
-                }
+                
             } finally {
                 unlockAsyncJobQueue();
             }
@@ -723,7 +734,7 @@ public class KahaDBStore extends Message
                             // The subscription might not exist.
                             return 0;
                         }
-                        cursorPos += 1;
+                        MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
 
                         int counter = 0;
                         try {
@@ -732,7 +743,7 @@ public class KahaDBStore extends Message
                             if (selector != null) {
                                 selectorExpression = SelectorParser.parse(selector);
                             }
-                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
                                     .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
                                 if (selectorExpression != null) {
@@ -765,9 +776,8 @@ public class KahaDBStore extends Message
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                        cursorPos += 1;
-
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                        MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
                             listener.recoverMessage(loadMessage(entry.getValue().location));
@@ -787,15 +797,15 @@ public class KahaDBStore extends Message
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
-                        if (cursorPos == null) {
-                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
-                            cursorPos += 1;
+                        MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
+                        if (moc == null) {
+                            long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            moc = new MessageOrderCursor(pos+1);
                         }
 
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
                                 .hasNext();) {
                             entry = iterator.next();
                             if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
@@ -806,7 +816,9 @@ public class KahaDBStore extends Message
                             }
                         }
                         if (entry != null) {
-                            sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
+                            MessageOrderCursor copy = sd.orderIndex.cursor.copy();
+                            copy.increment();
+                            sd.subscriptionCursors.put(subscriptionKey, copy);
                         }
                     }
                 });

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jul 29 14:52:03 2010
@@ -94,6 +94,7 @@ public class MessageDatabase extends Ser
     static final int CLOSED_STATE = 1;
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
+    static final int VERSION = 2;
 
 
     protected class Metadata {
@@ -104,7 +105,7 @@ public class MessageDatabase extends Ser
         protected Location firstInProgressTransactionLocation;
         protected Location producerSequenceIdTrackerLocation = null;
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
-
+        protected int version = VERSION;
         public void read(DataInput is) throws IOException {
             state = is.readInt();
             destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
@@ -126,6 +127,12 @@ public class MessageDatabase extends Ser
                 }
             } catch (EOFException expectedOnUpgrade) {
             }
+            try {
+               version = is.readInt();
+            }catch (EOFException expectedOnUpgrade) {
+                version=1;
+            }
+            LOG.info("KahaDB is version " + version);
         }
 
         public void write(DataOutput os) throws IOException {
@@ -152,6 +159,9 @@ public class MessageDatabase extends Ser
             } else {
                 os.writeBoolean(false);
             }
+            if (version > 1) {
+               os.writeInt(version);
+            }
         }
     }
 
@@ -974,22 +984,26 @@ public class MessageDatabase extends Ser
         }
 
         // Add the message.
-        long id = sd.nextMessageId++;
+        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
+        long id = sd.orderIndex.getNextMessageId(priority);
         Long previous = sd.locationIndex.put(tx, location, id);
-        if( previous == null ) {
+        if (previous == null) {
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
-            if( previous == null ) {
-                sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));            
+            if (previous == null) {
+                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
             } else {
-                // If the message ID as indexed, then the broker asked us to store a DUP
-                // message.  Bad BOY!  Don't do it, and log a warning.
-                LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
+                // If the message ID as indexed, then the broker asked us to
+                // store a DUP
+                // message. Bad BOY! Don't do it, and log a warning.
+                LOG.warn("Duplicate message add attempt rejected. Message id: " + command.getMessageId());
                 // TODO: consider just rolling back the tx.
                 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
             }
         } else {
-            // restore the previous value.. Looks like this was a redo of a previously
-            // added message.  We don't want to assign it a new id as the other indexes would 
+            // restore the previous value.. Looks like this was a redo of a
+            // previously
+            // added message. We don't want to assign it a new id as the other
+            // indexes would
             // be wrong..
             //
             // TODO: consider just rolling back the tx.
@@ -1049,9 +1063,7 @@ public class MessageDatabase extends Ser
 
     void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-        sd.orderIndex.clear(tx);
-        sd.orderIndex.unload(tx);
-        tx.free(sd.orderIndex.getPageId());
+        sd.orderIndex.remove(tx);
         
         sd.locationIndex.clear(tx);
         sd.locationIndex.unload(tx);
@@ -1085,7 +1097,7 @@ public class MessageDatabase extends Ser
             sd.subscriptions.put(tx, subscriptionKey, command);
             long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
-                ackLocation = sd.nextMessageId-1;
+                ackLocation = sd.orderIndex.nextMessageId-1;
             }
 
             sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
@@ -1273,17 +1285,17 @@ public class MessageDatabase extends Ser
             LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
         }
     }
-    
-    static class StoredDestination {
-        long nextMessageId;
-        BTreeIndex<Long, MessageKeys> orderIndex;
+ 
+    class StoredDestination {
+        
+        MessageOrderIndex orderIndex = new MessageOrderIndex();
         BTreeIndex<Location, Long> locationIndex;
         BTreeIndex<String, Long> messageIdIndex;
 
         // These bits are only set for Topics
         BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
         BTreeIndex<String, Long> subscriptionAcks;
-        HashMap<String, Long> subscriptionCursors;
+        HashMap<String, MessageOrderCursor> subscriptionCursors;
         TreeMap<Long, HashSet<String>> ackPositions;
     }
 
@@ -1291,7 +1303,7 @@ public class MessageDatabase extends Ser
 
         public StoredDestination readPayload(DataInput dataIn) throws IOException {
             StoredDestination value = new StoredDestination();
-            value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
             value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
             value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
 
@@ -1299,11 +1311,15 @@ public class MessageDatabase extends Ser
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
                 value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
             }
+            if (metadata.version >= 2) {
+                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+            }
             return value;
         }
 
         public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
-            dataOut.writeLong(value.orderIndex.getPageId());
+            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
             dataOut.writeLong(value.locationIndex.getPageId());
             dataOut.writeLong(value.messageIdIndex.getPageId());
             if (value.subscriptions != null) {
@@ -1313,6 +1329,10 @@ public class MessageDatabase extends Ser
             } else {
                 dataOut.writeBoolean(false);
             }
+            if (metadata.version >= 2) {
+                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
+                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
+            }
         }
     }
 
@@ -1385,7 +1405,7 @@ public class MessageDatabase extends Ser
         if (rc == null) {
             // Brand new destination.. allocate indexes for it.
             rc = new StoredDestination();
-            rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+            rc.orderIndex.allocate(tx);
             rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
             rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
 
@@ -1397,15 +1417,10 @@ public class MessageDatabase extends Ser
         }
 
         // Configure the marshalers and load.
-        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
         rc.orderIndex.load(tx);
 
         // Figure out the next key using the last entry in the destination.
-        Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
-        if( lastEntry!=null ) {
-            rc.nextMessageId = lastEntry.getKey()+1;
-        }
+        rc.orderIndex.configureLast(tx);
 
         rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
         rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
@@ -1427,19 +1442,19 @@ public class MessageDatabase extends Ser
             rc.subscriptionAcks.load(tx);
 
             rc.ackPositions = new TreeMap<Long, HashSet<String>>();
-            rc.subscriptionCursors = new HashMap<String, Long>();
+            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
 
             for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
                 Entry<String, Long> entry = iterator.next();
                 addAckLocation(rc, entry.getValue(), entry.getKey());
             }
             
-            if (rc.nextMessageId == 0) {
+            if (rc.orderIndex.nextMessageId == 0) {
                 // check for existing durable sub all acked out - pull next seq from acks as messages are gone
                 if (!rc.ackPositions.isEmpty()) {
                     Long lastAckedMessageId = rc.ackPositions.lastKey();
                     if (lastAckedMessageId != NOT_ACKED) {
-                        rc.nextMessageId = lastAckedMessageId+1;
+                        rc.orderIndex.nextMessageId = lastAckedMessageId+1;
                     }
                 }
             }
@@ -1486,18 +1501,7 @@ public class MessageDatabase extends Ser
 
                         // Find all the entries that need to get deleted.
                         ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
-                            Entry<Long, MessageKeys> entry = iterator.next();
-                            if (entry.getKey().compareTo(sequenceId) <= 0) {
-                                // We don't do the actually delete while we are
-                                // iterating the BTree since
-                                // iterating would fail.
-                                deletes.add(entry);
-                            }else {
-                                //no point in iterating the in-order sequences anymore
-                                break;
-                            }
-                        }
+                        sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
 
                         // Do the actual deletes.
                         for (Entry<Long, MessageKeys> entry : deletes) {
@@ -1816,4 +1820,337 @@ public class MessageDatabase extends Ser
     public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
         this.databaseLockedWaitDelay = databaseLockedWaitDelay;
     }
+    
+    
+    class MessageOrderCursor{
+        long defaultCursorPosition;
+        long lowPriorityCursorPosition;
+        long highPriorityCursorPosition;
+        MessageOrderCursor(){
+        }
+        
+        MessageOrderCursor(long position){
+            this.defaultCursorPosition=position;
+            this.lowPriorityCursorPosition=position;
+            this.highPriorityCursorPosition=position;
+        }
+        
+        MessageOrderCursor(MessageOrderCursor other){
+            this.defaultCursorPosition=other.defaultCursorPosition;
+            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+        }
+        
+        MessageOrderCursor copy() {
+            return new MessageOrderCursor(this);
+        }
+        
+        void reset() {
+            this.defaultCursorPosition=0;
+            this.highPriorityCursorPosition=0;
+            this.lowPriorityCursorPosition=0;
+        }
+        
+        void increment() {
+            if (defaultCursorPosition!=0) {
+                defaultCursorPosition++;
+            }
+            if (highPriorityCursorPosition!=0) {
+                highPriorityCursorPosition++;
+            }
+            if (lowPriorityCursorPosition!=0) {
+                lowPriorityCursorPosition++;
+            }
+        }
+    }
+    
+    class MessageOrderIndex{
+        long nextMessageId;
+        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
+        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
+        BTreeIndex<Long, MessageKeys> highPriorityIndex;
+        MessageOrderCursor cursor = new MessageOrderCursor();
+        Long lastDefaultKey;
+        Long lastHighKey;
+        Long lastLowKey;
+        
+        
+        MessageKeys remove(Transaction tx, Long key) throws IOException {
+            MessageKeys result = defaultPriorityIndex.remove(tx, key);
+            if (result == null && highPriorityIndex!=null) {
+                result = highPriorityIndex.remove(tx, key);
+                if (result ==null && lowPriorityIndex!=null) {
+                    result = lowPriorityIndex.remove(tx, key);
+                }
+            }
+            return result;
+        }
+        
+        void load(Transaction tx) throws IOException {
+            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            defaultPriorityIndex.load(tx);
+            if (metadata.version >= 2) {
+                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                lowPriorityIndex.load(tx);
+
+                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                highPriorityIndex.load(tx);
+            }
+        }
+        
+        void allocate(Transaction tx) throws IOException {
+            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+            if (metadata.version >= 2) {
+                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+            }
+        }
+        
+        void configureLast(Transaction tx) throws IOException {
+            // Figure out the next key using the last entry in the destination.
+            if (highPriorityIndex != null) {
+                Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
+                if (lastEntry != null) {
+                    nextMessageId = lastEntry.getKey() + 1;
+                } else {
+                    lastEntry = defaultPriorityIndex.getLast(tx);
+                    if (lastEntry != null) {
+                        nextMessageId = lastEntry.getKey() + 1;
+                    } else {
+                        lastEntry = lowPriorityIndex.getLast(tx);
+                        if (lastEntry != null) {
+                            nextMessageId = lastEntry.getKey() + 1;
+                        }
+                    }
+                }
+            } else {
+                Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
+                if (lastEntry != null) {
+                    nextMessageId = lastEntry.getKey() + 1;
+                }
+            }
+        }
+        
+               
+        void remove(Transaction tx) throws IOException {
+            defaultPriorityIndex.clear(tx);
+            defaultPriorityIndex.unload(tx);
+            tx.free(defaultPriorityIndex.getPageId());
+            if (lowPriorityIndex != null) {
+                lowPriorityIndex.clear(tx);
+                lowPriorityIndex.unload(tx);
+
+                tx.free(lowPriorityIndex.getPageId());
+            }
+            if (highPriorityIndex != null) {
+                highPriorityIndex.clear(tx);
+                highPriorityIndex.unload(tx);
+                tx.free(highPriorityIndex.getPageId());
+            }
+        }
+        
+        void resetCursorPosition() {
+            this.cursor.reset();
+            lastDefaultKey = null;
+            lastHighKey = null;
+            lastLowKey = null;
+        }
+        
+        void setBatch(Transaction tx, Long sequence) throws IOException {
+            if (sequence != null) {
+                Long nextPosition = new Long(sequence.longValue() + 1);
+                if (defaultPriorityIndex.containsKey(tx, sequence)) {
+                    lastDefaultKey = nextPosition;
+                    cursor.defaultCursorPosition = nextPosition.longValue();
+                } else if (highPriorityIndex != null) {
+                    if (highPriorityIndex.containsKey(tx, sequence)) {
+                        lastHighKey = nextPosition;
+                        cursor.highPriorityCursorPosition = nextPosition.longValue();
+                    } else if (lowPriorityIndex.containsKey(tx, sequence)) {
+                        lastLowKey = nextPosition;
+                        cursor.lowPriorityCursorPosition = nextPosition.longValue();
+                    }
+                } else {
+                    lastDefaultKey = nextPosition;
+                    cursor.defaultCursorPosition = nextPosition.longValue();
+                }
+            }
+        }
+        
+        void stoppedIterating() {
+            if (lastDefaultKey!=null) {
+                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
+            }
+            if (lastHighKey!=null) {
+                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
+            }
+            if (lastLowKey!=null) {
+                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
+            }
+            lastDefaultKey = null;
+            lastHighKey = null;
+            lastLowKey = null;
+        }
+        
+        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
+                throws IOException {
+            getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
+            if (highPriorityIndex != null) {
+                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
+            }
+            if (lowPriorityIndex != null) {
+                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
+            }
+        }
+        
+        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
+                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
+            for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx); iterator.hasNext();) {
+                Entry<Long, MessageKeys> entry = iterator.next();
+                if (entry.getKey().compareTo(sequenceId) <= 0) {
+                    // We don't do the actually delete while we are
+                    // iterating the BTree since
+                    // iterating would fail.
+                    deletes.add(entry);
+                } else {
+                    // no point in iterating the in-order sequences anymore
+                    break;
+                }
+            }
+        } 
+        
+        long getNextMessageId(int priority) {
+            return nextMessageId++;
+        }
+        
+        MessageKeys get(Transaction tx, Long key) throws IOException {
+            MessageKeys result = defaultPriorityIndex.get(tx, key);
+            if (result == null) {
+                result = highPriorityIndex.get(tx, key);
+                if (result == null) {
+                    result = lowPriorityIndex.get(tx, key);
+                }
+            }
+            return result;
+        }
+        
+        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
+            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
+                return defaultPriorityIndex.put(tx, key, value);
+            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
+                return highPriorityIndex.put(tx, key, value);
+            } else {
+                return lowPriorityIndex.put(tx, key, value);
+            }
+        }
+        
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
+            return new MessageOrderIterator(tx,cursor);
+        }
+        
+        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
+            return new MessageOrderIterator(tx,m);
+        }
+        
+        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
+            Iterator<Entry<Long, MessageKeys>>currentIterator;
+            final Iterator<Entry<Long, MessageKeys>>highIterator;
+            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
+            final Iterator<Entry<Long, MessageKeys>>lowIterator;
+            Long lastKey;
+            
+            
+
+            MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
+                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
+                if (highPriorityIndex != null) {
+                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
+                } else {
+                    this.highIterator = null;
+                }
+                if (lowPriorityIndex != null) {
+                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
+                } else {
+                    this.lowIterator = null;
+                }
+            }
+            
+            public boolean hasNext() {
+                if (currentIterator == null) {
+                    if (highIterator != null) {
+                        if (highIterator.hasNext()) {
+                            currentIterator = highIterator;
+                            return currentIterator.hasNext();
+                        }
+                        if (defaultIterator.hasNext()) {
+                            currentIterator = defaultIterator;
+                            return currentIterator.hasNext();
+                        }
+                        if (lowIterator.hasNext()) {
+                            currentIterator = lowIterator;
+                            return currentIterator.hasNext();
+                        }
+                        return false;
+                    } else {
+                        currentIterator = defaultIterator;
+                        return currentIterator.hasNext();
+                    }
+                }
+                if (highIterator != null) {
+                    if (currentIterator.hasNext()) {
+                        return true;
+                    }
+                    if (currentIterator == highIterator) {
+                        if (defaultIterator.hasNext()) {
+                            currentIterator = defaultIterator;
+                            return currentIterator.hasNext();
+                        }
+                        if (lowIterator.hasNext()) {
+                            currentIterator = lowIterator;
+                            return currentIterator.hasNext();
+                        }
+                        return false;
+                    }
+                    if (currentIterator == defaultIterator) {
+                        if (lowIterator.hasNext()) {
+                            currentIterator = lowIterator;
+                            return currentIterator.hasNext();
+                        }
+                        return false;
+                    }
+                }
+                return currentIterator.hasNext();
+            }
+
+            public Entry<Long, MessageKeys> next() {
+                Entry<Long, MessageKeys> result = currentIterator.next();
+                if (result != null) {
+                    Long key = result.getKey();
+                    if (highIterator != null) {
+                        if (currentIterator == defaultIterator) {
+                            lastDefaultKey = key;
+                        } else if (currentIterator == highIterator) {
+                            lastHighKey = key;
+                        } else {
+                            lastLowKey = key;
+                        }
+                    } else {
+                        lastDefaultKey = key;
+                    }
+                }
+                return result;
+            }
+
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+           
+        }
+    }
+    
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Thu Jul 29 14:52:03 2010
@@ -53,6 +53,8 @@ message KahaAddMessageCommand {
   required KahaDestination destination = 2;
   required string messageId = 3;
   required bytes message = 4;
+  optional int32 priority =5 [default = 4];
+  optional bool prioritySupported = 6;
 }
 
 message KahaRemoveMessageCommand {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java?rev=980458&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java Thu Jul 29 14:52:03 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import junit.framework.Test;
+import org.apache.activemq.store.MessagePriorityTest;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDBMessagePriorityTest extends MessagePriorityTest {
+
+    @Override
+    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        adapter.setConcurrentStoreAndDispatchTopics(false);
+        adapter.deleteAllMessages();
+        return adapter;
+    }
+    
+    public static Test suite() {
+        return suite(KahaDBMessagePriorityTest.class);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Thu Jul 29 14:52:03 2010
@@ -16,25 +16,20 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
 import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.util.IOHelper;
 
+import javax.jms.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.FileNotFoundException;
+
 /**
- * 
+ * @author chirino
  */
 public class KahaDBVersionTest extends TestCase {
 
@@ -49,11 +44,11 @@ public class KahaDBVersionTest extends T
 
     }
 
-    
-    public void testCreateStore() throws Exception {
+        
+    public void XtestCreateStore() throws Exception {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
         File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
         IOHelper.deleteFile(dir);
-        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
         kaha.setDirectory(dir);
         kaha.setJournalMaxFileLength(1024*1024);
         BrokerService broker = createBroker(kaha);
@@ -76,17 +71,15 @@ public class KahaDBVersionTest extends T
             Message msg = session.createTextMessage("test message:"+i);
             producer.send(msg);
         }
-        connection.close();
+        connection.stop();
         broker.stop();
-
-        
-
     }
     
-    public void XtestVersionConversion() throws Exception{
+    public void testVersionConversion() throws Exception{
         File testDir = new File("target/activemq-data/kahadb/versionDB");
         IOHelper.deleteFile(testDir);
         IOHelper.copyFile(VERSION_1_DB, testDir);
+        
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
         kaha.setDirectory(testDir);
         kaha.setJournalMaxFileLength(1024*1024);
@@ -100,18 +93,21 @@ public class KahaDBVersionTest extends T
         Queue queue = session.createQueue("test.queue");
         MessageConsumer queueConsumer = session.createConsumer(queue);
         for (int i = 0; i < 1000; i++) {
-            TextMessage msg  = (TextMessage) queueConsumer.receive();
-            System.err.println(msg.getText());
+            TextMessage msg  = (TextMessage) queueConsumer.receive(10000);
+            //System.err.println(msg.getText());
+            assertNotNull(msg);
         }
         MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test");
         for (int i = 0; i < 1000; i++) {
-            TextMessage msg  = (TextMessage) topicConsumer.receive();
-            System.err.println(msg.getText());
+            TextMessage msg  = (TextMessage) topicConsumer.receive(10000);
+            //System.err.println(msg.getText());
+            assertNotNull(msg);
         }
         broker.stop();
+        
     }
 
-   
+