You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/25 01:08:37 UTC
svn commit: r1401911 - in /activemq/trunk/activemq-leveldb/src:
main/scala/org/apache/activemq/leveldb/
test/scala/org/apache/activemq/leveldb/
Author: chirino
Date: Wed Oct 24 23:08:37 2012
New Revision: 1401911
URL: http://svn.apache.org/viewvc?rev=1401911&view=rev
Log:
Implementing AMQ-4134: Add XA support to the LevelDB store.
More test cases are now working. Only the durable sub XA cases are broken at this point.
Modified:
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed Oct 24 23:08:37 2012
@@ -21,7 +21,7 @@ import org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent._
import atomic._
-import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtbuf.{DataByteArrayOutputStream, DataByteArrayInputStream, ByteArrayOutputStream, Buffer}
import org.apache.activemq.store.MessageRecoveryListener
import java.lang.ref.WeakReference
import scala.Option._
@@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long)
-case class XaAckRecord(container:Long, seq:Long, ack:Buffer)
+case class XaAckRecord(container:Long, seq:Long, ack:MessageAck)
sealed trait UowState {
def stage:Int
@@ -130,7 +130,6 @@ class DelayableUOW(val manager:DBManager
val uowId:Int = manager.lastUowId.incrementAndGet()
var actions = Map[MessageId, MessageAction]()
var subAcks = ListBuffer[SubAckRecord]()
- var xaAcks = ListBuffer[XaAckRecord]()
var completed = false
var disableDelay = false
var delayableActions = 0
@@ -147,25 +146,26 @@ class DelayableUOW(val manager:DBManager
def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined
def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
sum + (entry._2.size+100)
- } + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) =>
- sum + entry.ack.length
- }
+ } + (subAcks.size * 100)
class MessageAction {
var id:MessageId = _
var messageRecord: MessageRecord = null
var enqueues = ListBuffer[QueueEntryRecord]()
var dequeues = ListBuffer[QueueEntryRecord]()
+ var xaAcks = ListBuffer[XaAckRecord]()
def uow = DelayableUOW.this
- def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+ def isEmpty() = messageRecord==null && enqueues.isEmpty && dequeues.isEmpty && xaAcks.isEmpty
def cancel() = {
uow.rm(id)
}
def syncNeeded = messageRecord!=null && messageRecord.syncNeeded
- def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50)
+ def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) =>
+ sum + 100
+ }
def addToPendingStore() = {
var set = manager.pendingStores.get(id)
@@ -222,8 +222,10 @@ class DelayableUOW(val manager:DBManager
}
def xaAck(container:Long, seq:Long, ack:MessageAck) = {
- var packet = manager.parent.wireFormat.marshal(ack)
- xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length))
+ this.synchronized {
+ getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack))
+ }
+ countDownFuture
}
def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean) = {
@@ -641,12 +643,10 @@ class DBManager(val parent:LevelDBStore)
def getXAActions(key:Long) = {
val msgs = ListBuffer[Message]()
val acks = ListBuffer[MessageAck]()
- println("transactionCursor")
client.transactionCursor(key) { command =>
- println("recovered command: "+command)
command match {
case message:Message => msgs += message
- case ack:MessageAck => acks += ack
+ case record:XaAckRecord => acks += record.ack
}
true
}
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Wed Oct 24 23:08:37 2012
@@ -944,13 +944,21 @@ class LevelDBClient(store: LevelDBStore)
}
}
- def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = {
+ def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
val seq = decodeLong(key)
if( value.getMeta != null ) {
- val data = value.getMeta
- val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck]
- func(ack)
+
+ val is = new DataByteArrayInputStream(value.getMeta);
+ val log = is.readLong()
+ val offset = is.readInt()
+ val qid = is.readLong()
+ val seq = is.readLong()
+ val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
+ ack.getLastMessageId.setDataLocator((log, offset))
+ ack.getLastMessageId.setEntryLocator((qid, seq))
+
+ func(XaAckRecord(collectionKey, seq, ack))
} else {
var locator = (value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
@@ -1068,7 +1076,7 @@ class LevelDBClient(store: LevelDBStore)
dataLocator = entry.id.getDataLocator match {
case x:(Long, Int) => x
case x:MessageRecord => x.locator
- case _ => throw new RuntimeException("Unexpected locator type")
+ case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
}
}
@@ -1126,18 +1134,37 @@ class LevelDBClient(store: LevelDBStore)
write_enqueue_total += System.nanoTime() - start
}
- }
+ action.xaAcks.foreach { entry =>
+ val ack = entry.ack
+ if( dataLocator==null ) {
+ dataLocator = ack.getLastMessageId.getDataLocator match {
+ case x:(Long, Int) => x
+ case x:MessageRecord => x.locator
+ case _ =>
+ throw new RuntimeException("Unexpected locator type")
+ }
+ }
+
+ val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
+ val os = new DataByteArrayOutputStream()
+ os.writeLong(dataLocator._1)
+ os.writeInt(dataLocator._2)
+ os.writeLong(qid)
+ os.writeLong(seq)
+ store.wireFormat.marshal(ack, os)
+ var ack_encoded = os.toBuffer
+
+ val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
+ val log_record = new EntryRecord.Bean()
+ log_record.setCollectionKey(entry.container)
+ log_record.setEntryKey(new Buffer(key, 9, 8))
+ log_record.setMeta(ack_encoded)
+ appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+ val index_record = new EntryRecord.Bean()
+ index_record.setMeta(ack_encoded)
+ batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+ }
- uow.xaAcks.foreach { entry =>
- val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
- val log_record = new EntryRecord.Bean()
- log_record.setCollectionKey(entry.container)
- log_record.setEntryKey(new Buffer(key, 9, 8))
- log_record.setMeta(entry.ack)
- appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
- val index_record = new EntryRecord.Bean()
- index_record.setValue(entry.ack)
- batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
}
uow.subAcks.foreach { entry =>
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed Oct 24 23:08:37 2012
@@ -229,7 +229,9 @@ class LevelDBStore extends ServiceSuppor
for ( ack <- acks ) {
// think we might have store design issue /w XA transactions and durable sub acks.
// does it even work for the other stores?
- transaction.remove(createMessageStore(ack.getDestination), ack);
+ var store = createMessageStore(ack.getDestination)
+ store.preparedAcks.add(ack.getLastMessageId)
+ transaction.remove(store, ack);
}
}
debug("started")
@@ -322,16 +324,25 @@ class LevelDBStore extends ServiceSuppor
def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = {
commitActions += new TransactionAction() {
+
def commit(uow:DelayableUOW) = {
store.doRemove(uow, ack.getLastMessageId)
+ if( prepared ) {
+ store.preparedAcks.remove(ack.getLastMessageId)
+ }
}
+
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
xarecovery._2 += ack
+ store.preparedAcks.add(ack.getLastMessageId)
}
def rollback(uow: DelayableUOW) {
+ if( prepared ) {
+ store.preparedAcks.remove(ack.getLastMessageId)
+ }
}
}
}
@@ -380,6 +391,15 @@ class LevelDBStore extends ServiceSuppor
println("The transaction does not exist")
case Some(tx)=>
if( tx.prepared ) {
+ val done = new CountDownLatch(1)
+ withUow { uow =>
+ for( action <- tx.commitActions ) {
+ action.rollback(uow)
+ }
+ uow.syncFlag = true
+ uow.addCompleteListener { done.countDown() }
+ }
+ done.await()
db.removeTransactionContainer(tx.xacontainer_id)
}
}
@@ -394,12 +414,18 @@ class LevelDBStore extends ServiceSuppor
}
}
+ var doingRecover = false
def recover(listener: TransactionRecoveryListener) = {
- for ( (txid, transaction) <- transactions ) {
- if( transaction.prepared ) {
- val (msgs, acks) = transaction.xarecovery
- listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
+ this.doingRecover = true
+ try {
+ for ( (txid, transaction) <- transactions ) {
+ if( transaction.prepared ) {
+ val (msgs, acks) = transaction.xarecovery
+ listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
+ }
}
+ } finally {
+ this.doingRecover = false
}
}
@@ -490,6 +516,7 @@ class LevelDBStore extends ServiceSuppor
protected val lastSeq: AtomicLong = new AtomicLong(0)
protected var cursorPosition: Long = 0
+ val preparedAcks = new HashSet[MessageId]()
lastSeq.set(db.getLastQueueEntrySeq(key))
@@ -556,7 +583,25 @@ class LevelDBStore extends ServiceSuppor
}
def recover(listener: MessageRecoveryListener): Unit = {
- cursorPosition = db.cursorMessages(key, listener, 0)
+ cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
+ }
+
+ def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener {
+ def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
+ def hasSpace = listener.hasSpace
+ def recoverMessageReference(ref: MessageId) = {
+ if (!preparedAcks.contains(ref)) {
+ listener.recoverMessageReference(ref)
+ }
+ true
+ }
+
+ def recoverMessage(message: Message) = {
+ if (!preparedAcks.contains(message.getMessageId)) {
+ listener.recoverMessage(message)
+ }
+ true
+ }
}
def resetBatching: Unit = {
@@ -564,7 +609,7 @@ class LevelDBStore extends ServiceSuppor
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
- cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition)
+ cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition)
}
override def setBatch(id: MessageId): Unit = {
@@ -697,7 +742,7 @@ class LevelDBStore extends ServiceSuppor
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
- sub.cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1))
+ sub.cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
}
}
Modified: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java Wed Oct 24 23:08:37 2012
@@ -27,21 +27,15 @@ public class LevelDBXARecoveryBrokerTest
broker.setPersistenceAdapter(store);
}
- // TODO: The following test cases are failing...
-
- @Override
- public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
- }
-
- @Override
- public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
- }
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+// XA Durable Subs not yet implemented
+// super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback();
}
-
@Override
public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
+// XA Durable Subs not yet implemented
+// super.testTopicPersistentPreparedAcksAvailableAfterRollback();
}
}