You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/29 16:52:04 UTC
svn commit: r980458 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/ main/proto/
test/java/org/apache/activemq/store/kahadb/
Author: rajdavies
Date: Thu Jul 29 14:52:03 2010
New Revision: 980458
URL: http://svn.apache.org/viewvc?rev=980458&view=rev
Log:
changes for https://issues.apache.org/activemq/browse/AMQ-2789 - message priority
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/proto/journal-data.proto
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Jul 29 14:52:03 2010
@@ -98,7 +98,7 @@ public class KahaDBStore extends Message
protected ExecutorService topicExecutor;
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
- private final WireFormat wireFormat = new OpenWireFormat();
+ final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
@@ -368,7 +368,8 @@ public class KahaDBStore extends Message
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
-
+ command.setPriority(message.getPriority());
+ command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
@@ -472,10 +473,12 @@ public class KahaDBStore extends Message
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
+ sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- listener.recoverMessage(loadMessage(entry.getValue().location));
+ Message msg = loadMessage(entry.getValue().location);
+ listener.recoverMessage(msg);
}
}
});
@@ -484,8 +487,7 @@ public class KahaDBStore extends Message
}
}
- long cursorPos = 0;
-
+
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
@@ -494,19 +496,19 @@ public class KahaDBStore extends Message
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int counter = 0;
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
.hasNext()
&& listener.hasSpace();) {
entry = iterator.next();
- listener.recoverMessage(loadMessage(entry.getValue().location));
+ Message msg = loadMessage(entry.getValue().location);
+ //System.err.println("RECOVER " + msg.getMessageId().getProducerSequenceId());
+ listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || listener.hasSpace() == false) {
break;
}
}
- if (entry != null) {
- cursorPos = entry.getKey() + 1;
- }
+ sd.orderIndex.stoppedIterating();
}
});
}finally {
@@ -515,7 +517,15 @@ public class KahaDBStore extends Message
}
public void resetBatching() {
- cursorPos = 0;
+ try {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
+ public void execute(Transaction tx) throws Exception {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ sd.orderIndex.resetCursorPosition();}
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to reset batching",e);
+ }
}
@Override
@@ -527,21 +537,22 @@ public class KahaDBStore extends Message
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
- Long location;
+
indexLock.readLock().lock();
try {
- location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
- public Long execute(Transaction tx) throws IOException {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
- return sd.messageIdIndex.get(tx, key);
+ Long location = sd.messageIdIndex.get(tx, key);
+ if (location != null) {
+ sd.orderIndex.setBatch(tx, location);
+ }
}
});
}finally {
indexLock.readLock().unlock();
}
- if (location != null) {
- cursorPos = location + 1;
- }
+
} finally {
unlockAsyncJobQueue();
}
@@ -723,7 +734,7 @@ public class KahaDBStore extends Message
// The subscription might not exist.
return 0;
}
- cursorPos += 1;
+ MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
int counter = 0;
try {
@@ -732,7 +743,7 @@ public class KahaDBStore extends Message
if (selector != null) {
selectorExpression = SelectorParser.parse(selector);
}
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
if (selectorExpression != null) {
@@ -765,9 +776,8 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
- cursorPos += 1;
-
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
@@ -787,15 +797,15 @@ public class KahaDBStore extends Message
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
- if (cursorPos == null) {
- cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
- cursorPos += 1;
+ MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
+ if (moc == null) {
+ long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
+ moc = new MessageOrderCursor(pos+1);
}
Entry<Long, MessageKeys> entry = null;
int counter = 0;
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
@@ -806,7 +816,9 @@ public class KahaDBStore extends Message
}
}
if (entry != null) {
- sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
+ MessageOrderCursor copy = sd.orderIndex.cursor.copy();
+ copy.increment();
+ sd.subscriptionCursors.put(subscriptionKey, copy);
}
}
});
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jul 29 14:52:03 2010
@@ -94,6 +94,7 @@ public class MessageDatabase extends Ser
static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
+ static final int VERSION = 2;
protected class Metadata {
@@ -104,7 +105,7 @@ public class MessageDatabase extends Ser
protected Location firstInProgressTransactionLocation;
protected Location producerSequenceIdTrackerLocation = null;
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
-
+ protected int version = VERSION;
public void read(DataInput is) throws IOException {
state = is.readInt();
destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
@@ -126,6 +127,12 @@ public class MessageDatabase extends Ser
}
} catch (EOFException expectedOnUpgrade) {
}
+ try {
+ version = is.readInt();
+ }catch (EOFException expectedOnUpgrade) {
+ version=1;
+ }
+ LOG.info("KahaDB is version " + version);
}
public void write(DataOutput os) throws IOException {
@@ -152,6 +159,9 @@ public class MessageDatabase extends Ser
} else {
os.writeBoolean(false);
}
+ if (version > 1) {
+ os.writeInt(version);
+ }
}
}
@@ -974,22 +984,26 @@ public class MessageDatabase extends Ser
}
// Add the message.
- long id = sd.nextMessageId++;
+ int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
+ long id = sd.orderIndex.getNextMessageId(priority);
Long previous = sd.locationIndex.put(tx, location, id);
- if( previous == null ) {
+ if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
- if( previous == null ) {
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ if (previous == null) {
+ sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
} else {
- // If the message ID as indexed, then the broker asked us to store a DUP
- // message. Bad BOY! Don't do it, and log a warning.
- LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
+ // If the message ID as indexed, then the broker asked us to
+ // store a DUP
+ // message. Bad BOY! Don't do it, and log a warning.
+ LOG.warn("Duplicate message add attempt rejected. Message id: " + command.getMessageId());
// TODO: consider just rolling back the tx.
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
}
} else {
- // restore the previous value.. Looks like this was a redo of a previously
- // added message. We don't want to assign it a new id as the other indexes would
+ // restore the previous value.. Looks like this was a redo of a
+ // previously
+ // added message. We don't want to assign it a new id as the other
+ // indexes would
// be wrong..
//
// TODO: consider just rolling back the tx.
@@ -1049,9 +1063,7 @@ public class MessageDatabase extends Ser
void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
- sd.orderIndex.clear(tx);
- sd.orderIndex.unload(tx);
- tx.free(sd.orderIndex.getPageId());
+ sd.orderIndex.remove(tx);
sd.locationIndex.clear(tx);
sd.locationIndex.unload(tx);
@@ -1085,7 +1097,7 @@ public class MessageDatabase extends Ser
sd.subscriptions.put(tx, subscriptionKey, command);
long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) {
- ackLocation = sd.nextMessageId-1;
+ ackLocation = sd.orderIndex.nextMessageId-1;
}
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
@@ -1273,17 +1285,17 @@ public class MessageDatabase extends Ser
LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
}
}
-
- static class StoredDestination {
- long nextMessageId;
- BTreeIndex<Long, MessageKeys> orderIndex;
+
+ class StoredDestination {
+
+ MessageOrderIndex orderIndex = new MessageOrderIndex();
BTreeIndex<Location, Long> locationIndex;
BTreeIndex<String, Long> messageIdIndex;
// These bits are only set for Topics
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
BTreeIndex<String, Long> subscriptionAcks;
- HashMap<String, Long> subscriptionCursors;
+ HashMap<String, MessageOrderCursor> subscriptionCursors;
TreeMap<Long, HashSet<String>> ackPositions;
}
@@ -1291,7 +1303,7 @@ public class MessageDatabase extends Ser
public StoredDestination readPayload(DataInput dataIn) throws IOException {
StoredDestination value = new StoredDestination();
- value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
@@ -1299,11 +1311,15 @@ public class MessageDatabase extends Ser
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
}
+ if (metadata.version >= 2) {
+ value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+ }
return value;
}
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
- dataOut.writeLong(value.orderIndex.getPageId());
+ dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
dataOut.writeLong(value.locationIndex.getPageId());
dataOut.writeLong(value.messageIdIndex.getPageId());
if (value.subscriptions != null) {
@@ -1313,6 +1329,10 @@ public class MessageDatabase extends Ser
} else {
dataOut.writeBoolean(false);
}
+ if (metadata.version >= 2) {
+ dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
+ dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
+ }
}
}
@@ -1385,7 +1405,7 @@ public class MessageDatabase extends Ser
if (rc == null) {
// Brand new destination.. allocate indexes for it.
rc = new StoredDestination();
- rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ rc.orderIndex.allocate(tx);
rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
@@ -1397,15 +1417,10 @@ public class MessageDatabase extends Ser
}
// Configure the marshalers and load.
- rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
rc.orderIndex.load(tx);
// Figure out the next key using the last entry in the destination.
- Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
- if( lastEntry!=null ) {
- rc.nextMessageId = lastEntry.getKey()+1;
- }
+ rc.orderIndex.configureLast(tx);
rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
@@ -1427,19 +1442,19 @@ public class MessageDatabase extends Ser
rc.subscriptionAcks.load(tx);
rc.ackPositions = new TreeMap<Long, HashSet<String>>();
- rc.subscriptionCursors = new HashMap<String, Long>();
+ rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
Entry<String, Long> entry = iterator.next();
addAckLocation(rc, entry.getValue(), entry.getKey());
}
- if (rc.nextMessageId == 0) {
+ if (rc.orderIndex.nextMessageId == 0) {
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
if (!rc.ackPositions.isEmpty()) {
Long lastAckedMessageId = rc.ackPositions.lastKey();
if (lastAckedMessageId != NOT_ACKED) {
- rc.nextMessageId = lastAckedMessageId+1;
+ rc.orderIndex.nextMessageId = lastAckedMessageId+1;
}
}
}
@@ -1486,18 +1501,7 @@ public class MessageDatabase extends Ser
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
- Entry<Long, MessageKeys> entry = iterator.next();
- if (entry.getKey().compareTo(sequenceId) <= 0) {
- // We don't do the actually delete while we are
- // iterating the BTree since
- // iterating would fail.
- deletes.add(entry);
- }else {
- //no point in iterating the in-order sequences anymore
- break;
- }
- }
+ sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
// Do the actual deletes.
for (Entry<Long, MessageKeys> entry : deletes) {
@@ -1816,4 +1820,337 @@ public class MessageDatabase extends Ser
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
this.databaseLockedWaitDelay = databaseLockedWaitDelay;
}
+
+
+ class MessageOrderCursor{
+ long defaultCursorPosition;
+ long lowPriorityCursorPosition;
+ long highPriorityCursorPosition;
+ MessageOrderCursor(){
+ }
+
+ MessageOrderCursor(long position){
+ this.defaultCursorPosition=position;
+ this.lowPriorityCursorPosition=position;
+ this.highPriorityCursorPosition=position;
+ }
+
+ MessageOrderCursor(MessageOrderCursor other){
+ this.defaultCursorPosition=other.defaultCursorPosition;
+ this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+ this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+ }
+
+ MessageOrderCursor copy() {
+ return new MessageOrderCursor(this);
+ }
+
+ void reset() {
+ this.defaultCursorPosition=0;
+ this.highPriorityCursorPosition=0;
+ this.lowPriorityCursorPosition=0;
+ }
+
+ void increment() {
+ if (defaultCursorPosition!=0) {
+ defaultCursorPosition++;
+ }
+ if (highPriorityCursorPosition!=0) {
+ highPriorityCursorPosition++;
+ }
+ if (lowPriorityCursorPosition!=0) {
+ lowPriorityCursorPosition++;
+ }
+ }
+ }
+
+ class MessageOrderIndex{
+ long nextMessageId;
+ BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
+ BTreeIndex<Long, MessageKeys> lowPriorityIndex;
+ BTreeIndex<Long, MessageKeys> highPriorityIndex;
+ MessageOrderCursor cursor = new MessageOrderCursor();
+ Long lastDefaultKey;
+ Long lastHighKey;
+ Long lastLowKey;
+
+
+ MessageKeys remove(Transaction tx, Long key) throws IOException {
+ MessageKeys result = defaultPriorityIndex.remove(tx, key);
+ if (result == null && highPriorityIndex!=null) {
+ result = highPriorityIndex.remove(tx, key);
+ if (result ==null && lowPriorityIndex!=null) {
+ result = lowPriorityIndex.remove(tx, key);
+ }
+ }
+ return result;
+ }
+
+ void load(Transaction tx) throws IOException {
+ defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ defaultPriorityIndex.load(tx);
+ if (metadata.version >= 2) {
+ lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ lowPriorityIndex.load(tx);
+
+ highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ highPriorityIndex.load(tx);
+ }
+ }
+
+ void allocate(Transaction tx) throws IOException {
+ defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ if (metadata.version >= 2) {
+ lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+ }
+ }
+
+ void configureLast(Transaction tx) throws IOException {
+ // Figure out the next key using the last entry in the destination.
+ if (highPriorityIndex != null) {
+ Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
+ if (lastEntry != null) {
+ nextMessageId = lastEntry.getKey() + 1;
+ } else {
+ lastEntry = defaultPriorityIndex.getLast(tx);
+ if (lastEntry != null) {
+ nextMessageId = lastEntry.getKey() + 1;
+ } else {
+ lastEntry = lowPriorityIndex.getLast(tx);
+ if (lastEntry != null) {
+ nextMessageId = lastEntry.getKey() + 1;
+ }
+ }
+ }
+ } else {
+ Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
+ if (lastEntry != null) {
+ nextMessageId = lastEntry.getKey() + 1;
+ }
+ }
+ }
+
+
+ void remove(Transaction tx) throws IOException {
+ defaultPriorityIndex.clear(tx);
+ defaultPriorityIndex.unload(tx);
+ tx.free(defaultPriorityIndex.getPageId());
+ if (lowPriorityIndex != null) {
+ lowPriorityIndex.clear(tx);
+ lowPriorityIndex.unload(tx);
+
+ tx.free(lowPriorityIndex.getPageId());
+ }
+ if (highPriorityIndex != null) {
+ highPriorityIndex.clear(tx);
+ highPriorityIndex.unload(tx);
+ tx.free(highPriorityIndex.getPageId());
+ }
+ }
+
+ void resetCursorPosition() {
+ this.cursor.reset();
+ lastDefaultKey = null;
+ lastHighKey = null;
+ lastLowKey = null;
+ }
+
+ void setBatch(Transaction tx, Long sequence) throws IOException {
+ if (sequence != null) {
+ Long nextPosition = new Long(sequence.longValue() + 1);
+ if (defaultPriorityIndex.containsKey(tx, sequence)) {
+ lastDefaultKey = nextPosition;
+ cursor.defaultCursorPosition = nextPosition.longValue();
+ } else if (highPriorityIndex != null) {
+ if (highPriorityIndex.containsKey(tx, sequence)) {
+ lastHighKey = nextPosition;
+ cursor.highPriorityCursorPosition = nextPosition.longValue();
+ } else if (lowPriorityIndex.containsKey(tx, sequence)) {
+ lastLowKey = nextPosition;
+ cursor.lowPriorityCursorPosition = nextPosition.longValue();
+ }
+ } else {
+ lastDefaultKey = nextPosition;
+ cursor.defaultCursorPosition = nextPosition.longValue();
+ }
+ }
+ }
+
+ void stoppedIterating() {
+ if (lastDefaultKey!=null) {
+ cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
+ }
+ if (lastHighKey!=null) {
+ cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
+ }
+ if (lastLowKey!=null) {
+ cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
+ }
+ lastDefaultKey = null;
+ lastHighKey = null;
+ lastLowKey = null;
+ }
+
+ void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
+ throws IOException {
+ getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
+ if (highPriorityIndex != null) {
+ getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
+ }
+ if (lowPriorityIndex != null) {
+ getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
+ }
+ }
+
+ void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
+ BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx); iterator.hasNext();) {
+ Entry<Long, MessageKeys> entry = iterator.next();
+ if (entry.getKey().compareTo(sequenceId) <= 0) {
+ // We don't do the actually delete while we are
+ // iterating the BTree since
+ // iterating would fail.
+ deletes.add(entry);
+ } else {
+ // no point in iterating the in-order sequences anymore
+ break;
+ }
+ }
+ }
+
+ long getNextMessageId(int priority) {
+ return nextMessageId++;
+ }
+
+ MessageKeys get(Transaction tx, Long key) throws IOException {
+ MessageKeys result = defaultPriorityIndex.get(tx, key);
+ if (result == null) {
+ result = highPriorityIndex.get(tx, key);
+ if (result == null) {
+ result = lowPriorityIndex.get(tx, key);
+ }
+ }
+ return result;
+ }
+
+ MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
+ if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
+ return defaultPriorityIndex.put(tx, key, value);
+ } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
+ return highPriorityIndex.put(tx, key, value);
+ } else {
+ return lowPriorityIndex.put(tx, key, value);
+ }
+ }
+
+ Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
+ return new MessageOrderIterator(tx,cursor);
+ }
+
+ Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
+ return new MessageOrderIterator(tx,m);
+ }
+
+ class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
+ Iterator<Entry<Long, MessageKeys>>currentIterator;
+ final Iterator<Entry<Long, MessageKeys>>highIterator;
+ final Iterator<Entry<Long, MessageKeys>>defaultIterator;
+ final Iterator<Entry<Long, MessageKeys>>lowIterator;
+ Long lastKey;
+
+
+
+ MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
+ this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
+ if (highPriorityIndex != null) {
+ this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
+ } else {
+ this.highIterator = null;
+ }
+ if (lowPriorityIndex != null) {
+ this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
+ } else {
+ this.lowIterator = null;
+ }
+ }
+
+ public boolean hasNext() {
+ if (currentIterator == null) {
+ if (highIterator != null) {
+ if (highIterator.hasNext()) {
+ currentIterator = highIterator;
+ return currentIterator.hasNext();
+ }
+ if (defaultIterator.hasNext()) {
+ currentIterator = defaultIterator;
+ return currentIterator.hasNext();
+ }
+ if (lowIterator.hasNext()) {
+ currentIterator = lowIterator;
+ return currentIterator.hasNext();
+ }
+ return false;
+ } else {
+ currentIterator = defaultIterator;
+ return currentIterator.hasNext();
+ }
+ }
+ if (highIterator != null) {
+ if (currentIterator.hasNext()) {
+ return true;
+ }
+ if (currentIterator == highIterator) {
+ if (defaultIterator.hasNext()) {
+ currentIterator = defaultIterator;
+ return currentIterator.hasNext();
+ }
+ if (lowIterator.hasNext()) {
+ currentIterator = lowIterator;
+ return currentIterator.hasNext();
+ }
+ return false;
+ }
+ if (currentIterator == defaultIterator) {
+ if (lowIterator.hasNext()) {
+ currentIterator = lowIterator;
+ return currentIterator.hasNext();
+ }
+ return false;
+ }
+ }
+ return currentIterator.hasNext();
+ }
+
+ public Entry<Long, MessageKeys> next() {
+ Entry<Long, MessageKeys> result = currentIterator.next();
+ if (result != null) {
+ Long key = result.getKey();
+ if (highIterator != null) {
+ if (currentIterator == defaultIterator) {
+ lastDefaultKey = key;
+ } else if (currentIterator == highIterator) {
+ lastHighKey = key;
+ } else {
+ lastLowKey = key;
+ }
+ } else {
+ lastDefaultKey = key;
+ }
+ }
+ return result;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+ }
+
+
+
}
Modified: activemq/trunk/activemq-core/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Thu Jul 29 14:52:03 2010
@@ -53,6 +53,8 @@ message KahaAddMessageCommand {
required KahaDestination destination = 2;
required string messageId = 3;
required bytes message = 4;
+ optional int32 priority =5 [default = 4];
+ optional bool prioritySupported = 6;
}
message KahaRemoveMessageCommand {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java?rev=980458&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java Thu Jul 29 14:52:03 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import junit.framework.Test;
+import org.apache.activemq.store.MessagePriorityTest;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDBMessagePriorityTest extends MessagePriorityTest {
+
+ @Override
+ protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setConcurrentStoreAndDispatchQueues(false);
+ adapter.setConcurrentStoreAndDispatchTopics(false);
+ adapter.deleteAllMessages();
+ return adapter;
+ }
+
+ public static Test suite() {
+ return suite(KahaDBMessagePriorityTest.class);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=980458&r1=980457&r2=980458&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Thu Jul 29 14:52:03 2010
@@ -16,25 +16,20 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.File;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.IOHelper;
+import javax.jms.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.FileNotFoundException;
+
/**
- *
+ * @author chirino
*/
public class KahaDBVersionTest extends TestCase {
@@ -49,11 +44,11 @@ public class KahaDBVersionTest extends T
}
-
- public void testCreateStore() throws Exception {
+
+ public void XtestCreateStore() throws Exception {
+ KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
IOHelper.deleteFile(dir);
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setDirectory(dir);
kaha.setJournalMaxFileLength(1024*1024);
BrokerService broker = createBroker(kaha);
@@ -76,17 +71,15 @@ public class KahaDBVersionTest extends T
Message msg = session.createTextMessage("test message:"+i);
producer.send(msg);
}
- connection.close();
+ connection.stop();
broker.stop();
-
-
-
}
- public void XtestVersionConversion() throws Exception{
+ public void testVersionConversion() throws Exception{
File testDir = new File("target/activemq-data/kahadb/versionDB");
IOHelper.deleteFile(testDir);
IOHelper.copyFile(VERSION_1_DB, testDir);
+
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setDirectory(testDir);
kaha.setJournalMaxFileLength(1024*1024);
@@ -100,18 +93,21 @@ public class KahaDBVersionTest extends T
Queue queue = session.createQueue("test.queue");
MessageConsumer queueConsumer = session.createConsumer(queue);
for (int i = 0; i < 1000; i++) {
- TextMessage msg = (TextMessage) queueConsumer.receive();
- System.err.println(msg.getText());
+ TextMessage msg = (TextMessage) queueConsumer.receive(10000);
+ //System.err.println(msg.getText());
+ assertNotNull(msg);
}
MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test");
for (int i = 0; i < 1000; i++) {
- TextMessage msg = (TextMessage) topicConsumer.receive();
- System.err.println(msg.getText());
+ TextMessage msg = (TextMessage) topicConsumer.receive(10000);
+ //System.err.println(msg.getText());
+ assertNotNull(msg);
}
broker.stop();
+
}
-
+