You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/06/06 01:01:14 UTC
svn commit: r1490065 - in
/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb:
DBManager.scala LevelDBStore.scala
Author: chirino
Date: Wed Jun 5 23:01:13 2013
New Revision: 1490065
URL: http://svn.apache.org/r1490065
Log:
Fixes a ghost messages issue where the queue cursor goes out of sync /w the leveldb store when transactions are being used.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1490065&r1=1490064&r2=1490065&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed Jun 5 23:01:13 2013
@@ -331,9 +331,7 @@ class DelayableUOW(val manager:DBManager
val s = size
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
- manager.parent.blocking_executor.execute(^{
- complete_listeners.foreach(_())
- })
+ complete_listeners.foreach(_())
} else {
manager.asyncCapacityRemaining.addAndGet(s)
}
@@ -352,9 +350,7 @@ class DelayableUOW(val manager:DBManager
asyncCapacityUsed = 0
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
- manager.parent.blocking_executor.execute(^{
- complete_listeners.foreach(_())
- })
+ complete_listeners.foreach(_())
}
countDownFuture.set(null)
@@ -665,19 +661,15 @@ class DBManager(val parent:LevelDBStore)
client.collectionIsEmpty(key)
}
- def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
+ def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
var lastmsgid:MessageId = null
+ var count = 0L
client.queueCursor(key, startPos) { msg =>
- if( listener.hasSpace ) {
- if( listener.recoverMessage(msg) ) {
- lastmsgid = msg.getMessageId
- true
- } else {
- false
- }
- } else {
- false
+ if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
+ lastmsgid = msg.getMessageId
+ count += 1
}
+ count < max
}
if( lastmsgid==null ) {
startPos
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1490065&r1=1490064&r2=1490065&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed Jun 5 23:01:13 2013
@@ -421,18 +421,23 @@ class LevelDBStore extends LockableServi
}
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
- preCommit.run()
transactions.remove(txid) match {
case null =>
postCommit.run()
case tx =>
val done = new CountDownLatch(1)
- withUow { uow =>
- for( action <- tx.commitActions ) {
- action.commit(uow)
+ // Ugly synchronization hack to make sure messages are ordered the way the cursor expects them.
+ transactions.synchronized {
+ withUow { uow =>
+ for( action <- tx.commitActions ) {
+ action.commit(uow)
+ }
+ uow.syncFlag = true
+ uow.addCompleteListener {
+ preCommit.run()
+ done.countDown()
+ }
}
- uow.syncFlag = true
- uow.addCompleteListener { done.countDown() }
}
done.await()
if( tx.prepared ) {
@@ -611,14 +616,15 @@ class LevelDBStore extends LockableServi
case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) {
- protected val lastSeq: AtomicLong = new AtomicLong(0)
+ val lastSeq: AtomicLong = new AtomicLong(0)
protected var cursorPosition: Long = 0
val preparedAcks = new HashSet[MessageId]()
lastSeq.set(db.getLastQueueEntrySeq(key))
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
- uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
+ val seq = lastSeq.incrementAndGet()
+ uow.enqueue(key, seq, message, delay)
}
@@ -680,27 +686,7 @@ class LevelDBStore extends LockableServi
}
def recover(listener: MessageRecoveryListener): Unit = {
- cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
- }
-
- case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener {
- def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
- def hasSpace = listener.hasSpace
- def recoverMessageReference(ref: MessageId) = {
- if (!preparedAcks.contains(ref)) {
- listener.recoverMessageReference(ref)
- } else {
- true
- }
- }
-
- def recoverMessage(message: Message) = {
- if (!preparedAcks.contains(message.getMessageId)) {
- listener.recoverMessage(message)
- } else {
- true
- }
- }
+ cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
}
def resetBatching: Unit = {
@@ -708,32 +694,15 @@ class LevelDBStore extends LockableServi
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
- val limiting = LimitingRecoveryListener(maxReturned, listener)
- val excluding = PreparedExcluding(limiting)
- cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
+ cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
}
override def setBatch(id: MessageId): Unit = {
- cursorPosition = db.queuePosition(id)+1
+ cursorPosition = db.queuePosition(id)
}
}
- case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
- var recovered: Int = 0
- def hasSpace = recovered < max
- def recoverMessage(message: Message) = {
- recovered += 1;
- listener.recoverMessage(message)
- }
- def recoverMessageReference(ref: MessageId) = {
- recovered += 1;
- listener.recoverMessageReference(ref)
- }
- def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
- }
-
-
//
// This gts called when the store is first loading up, it restores
// the existing durable subs..
@@ -849,13 +818,13 @@ class LevelDBStore extends LockableServi
}
def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
- sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
+ sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
}
}
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
- sub.cursorPosition = db.cursorMessages(key, PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
+ sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
}
}