You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/16 15:50:35 UTC
svn commit: r1483369 -
/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Author: chirino
Date: Thu May 16 13:50:35 2013
New Revision: 1483369
URL: http://svn.apache.org/r1483369
Log:
Additional fix for AMQ-4535: Seems store was getting out of sync /w cursor due to thread unsafe access in the leveldb store.
Switched to a concurrent HashMap to track the transaction since that will be getting accessed by multiple threads.
Modified:
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1483369&r1=1483368&r2=1483369&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu May 16 13:50:35 2013
@@ -230,6 +230,7 @@ class LevelDBStore extends LockableServi
db.loadCollections
// Finish recovering the prepared XA transactions.
+ import collection.JavaConversions._
for( (txid, transaction) <- transactions ) {
assert( transaction.xacontainer_id != -1 )
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
@@ -290,7 +291,7 @@ class LevelDBStore extends LockableServi
def createTransactionStore = this
- val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
+ val transactions = new ConcurrentHashMap[TransactionId, Transaction]()
trait TransactionAction {
def commit(uow:DelayableUOW):Unit
@@ -395,14 +396,24 @@ class LevelDBStore extends LockableServi
}
}
- def transaction(txid: TransactionId) = transactions.getOrElseUpdate(txid, Transaction(txid))
+ def transaction(txid: TransactionId) = {
+ var rc = transactions.get(txid)
+ if( rc == null ) {
+ rc = Transaction(txid)
+ val prev = transactions.putIfAbsent(txid, rc)
+ if (prev!=null) {
+ rc = prev
+ }
+ }
+ rc
+ }
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
preCommit.run()
transactions.remove(txid) match {
- case None=>
+ case null =>
postCommit.run()
- case Some(tx)=>
+ case tx =>
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions ) {
@@ -421,9 +432,9 @@ class LevelDBStore extends LockableServi
def rollback(txid: TransactionId) = {
transactions.remove(txid) match {
- case None=>
+ case null =>
println("The transaction does not exist")
- case Some(tx)=>
+ case tx =>
if( tx.prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
@@ -441,9 +452,9 @@ class LevelDBStore extends LockableServi
def prepare(tx: TransactionId) = {
transactions.get(tx) match {
- case None=>
+ case null =>
println("The transaction does not exist")
- case Some(tx)=>
+ case tx =>
tx.prepare
}
}
@@ -452,6 +463,7 @@ class LevelDBStore extends LockableServi
def recover(listener: TransactionRecoveryListener) = {
this.doingRecover = true
try {
+ import collection.JavaConversions._
for ( (txid, transaction) <- transactions ) {
if( transaction.prepared ) {
val (msgs, acks) = transaction.xarecovery