You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/25 04:48:50 UTC
svn commit: r1401956 - in /activemq/trunk/activemq-leveldb/src:
main/scala/org/apache/activemq/leveldb/
test/scala/org/apache/activemq/leveldb/
Author: chirino
Date: Thu Oct 25 02:48:50 2012
New Revision: 1401956
URL: http://svn.apache.org/viewvc?rev=1401956&view=rev
Log:
Implementing AMQ-4134: Add XA support to the LevelDB store.
All XA tests are now working.
Modified:
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Thu Oct 25 02:48:50 2012
@@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long)
-case class XaAckRecord(container:Long, seq:Long, ack:MessageAck)
+case class XaAckRecord(container:Long, seq:Long, ack:MessageAck, sub:Long = -1)
sealed trait UowState {
def stage:Int
@@ -217,13 +217,13 @@ class DelayableUOW(val manager:DBManager
}
}
- def updateAckPosition(sub:DurableSubscription) = {
- subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition)
+ def updateAckPosition(sub_key:Long, ack_seq:Long) = {
+ subAcks += SubAckRecord(sub_key, ack_seq)
}
- def xaAck(container:Long, seq:Long, ack:MessageAck) = {
+ def xaAck(record:XaAckRecord) = {
this.synchronized {
- getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack))
+ getAction(record.ack.getLastMessageId).xaAcks+=record
}
countDownFuture
}
@@ -642,11 +642,11 @@ class DBManager(val parent:LevelDBStore)
def getXAActions(key:Long) = {
val msgs = ListBuffer[Message]()
- val acks = ListBuffer[MessageAck]()
+ val acks = ListBuffer[XaAckRecord]()
client.transactionCursor(key) { command =>
command match {
case message:Message => msgs += message
- case record:XaAckRecord => acks += record.ack
+ case record:XaAckRecord => acks += record
}
true
}
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu Oct 25 02:48:50 2012
@@ -954,11 +954,12 @@ class LevelDBClient(store: LevelDBStore)
val offset = is.readInt()
val qid = is.readLong()
val seq = is.readLong()
+ val sub = is.readLong()
val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
ack.getLastMessageId.setDataLocator((log, offset))
ack.getLastMessageId.setEntryLocator((qid, seq))
- func(XaAckRecord(collectionKey, seq, ack))
+ func(XaAckRecord(collectionKey, seq, ack, sub))
} else {
var locator = (value.getValueLocation, value.getValueLength)
val msg = getMessage(locator)
@@ -1134,7 +1135,7 @@ class LevelDBClient(store: LevelDBStore)
write_enqueue_total += System.nanoTime() - start
}
- action.xaAcks.foreach { entry =>
+ action.xaAcks.foreach { entry:XaAckRecord =>
val ack = entry.ack
if( dataLocator==null ) {
dataLocator = ack.getLastMessageId.getDataLocator match {
@@ -1144,6 +1145,7 @@ class LevelDBClient(store: LevelDBStore)
throw new RuntimeException("Unexpected locator type")
}
}
+ println(dataLocator)
val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
val os = new DataByteArrayOutputStream()
@@ -1151,6 +1153,7 @@ class LevelDBClient(store: LevelDBStore)
os.writeInt(dataLocator._2)
os.writeLong(qid)
os.writeLong(seq)
+ os.writeLong(entry.sub)
store.wireFormat.marshal(ack, os)
var ack_encoded = os.toBuffer
Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu Oct 25 02:48:50 2012
@@ -222,16 +222,24 @@ class LevelDBStore extends ServiceSuppor
for( (txid, transaction) <- transactions ) {
assert( transaction.xacontainer_id != -1 )
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
- transaction.xarecovery = (msgs, acks)
+ transaction.xarecovery = (msgs, acks.map(_.ack))
for ( msg <- msgs ) {
transaction.add(createMessageStore(msg.getDestination), msg, false);
}
- for ( ack <- acks ) {
- // think we might have store design issue /w XA transactions and durable sub acks.
- // does it even work for the other stores?
+ for ( record <- acks ) {
+ var ack = record.ack
var store = createMessageStore(ack.getDestination)
- store.preparedAcks.add(ack.getLastMessageId)
- transaction.remove(store, ack);
+ if( record.sub == -1 ) {
+ store.preparedAcks.add(ack.getLastMessageId)
+ transaction.remove(store, ack);
+ } else {
+ val topic = store.asInstanceOf[LevelDBTopicMessageStore];
+ for ( sub <- topic.subscription_with_key(record.sub) ) {
+ val position = db.queuePosition(ack.getLastMessageId)
+ transaction.updateAckPosition( topic, sub, position, ack);
+ sub.lastAckPosition = position
+ }
+ }
}
}
debug("started")
@@ -334,7 +342,7 @@ class LevelDBStore extends ServiceSuppor
def prepare(uow:DelayableUOW) = {
// add it to the xa container instead of the actual store container.
- uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
+ uow.xaAck(XaAckRecord(xacontainer_id, xaseqcounter.incrementAndGet, ack))
xarecovery._2 += ack
store.preparedAcks.add(ack.getLastMessageId)
}
@@ -347,14 +355,22 @@ class LevelDBStore extends ServiceSuppor
}
}
- def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
+ def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long, ack:MessageAck) = {
commitActions += new TransactionAction() {
+ var prev_position = sub.lastAckPosition
+
def commit(uow:DelayableUOW) = {
store.doUpdateAckPosition(uow, sub, position)
}
def prepare(uow:DelayableUOW) = {
+ prev_position = sub.lastAckPosition
+ sub.lastAckPosition = position
+ uow.xaAck(XaAckRecord(xacontainer_id, xaseqcounter.incrementAndGet, ack, sub.subKey))
}
def rollback(uow: DelayableUOW) {
+ if ( prepared ) {
+ sub.lastAckPosition = prev_position
+ }
}
}
}
@@ -393,7 +409,7 @@ class LevelDBStore extends ServiceSuppor
if( tx.prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
- for( action <- tx.commitActions ) {
+ for( action <- tx.commitActions.reverse ) {
action.rollback(uow)
}
uow.syncFlag = true
@@ -661,6 +677,12 @@ class LevelDBStore extends ServiceSuppor
val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
var firstSeq = 0L
+ def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
+
+ override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+ super.asyncAddQueueMessage(context, message, false)
+ }
+
def gcPosition:Option[(Long, Long)] = {
var pos = lastSeq.get()
subscriptions.synchronized {
@@ -685,7 +707,7 @@ class LevelDBStore extends ServiceSuppor
}
sub.lastAckPosition = if (retroactive) 0 else lastSeq.get()
waitOn(withUow{ uow=>
- uow.updateAckPosition(sub)
+ uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
uow.countDownFuture
})
}
@@ -710,14 +732,14 @@ class LevelDBStore extends ServiceSuppor
def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long) = {
sub.lastAckPosition = position
- uow.updateAckPosition(sub)
+ uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
}
def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
var position = db.queuePosition(messageId)
if( ack.getTransactionId!=null ) {
- transaction(ack.getTransactionId).updateAckPosition(this, sub, position)
+ transaction(ack.getTransactionId).updateAckPosition(this, sub, position, ack)
DONE
} else {
waitOn(withUow{ uow=>
@@ -748,7 +770,8 @@ class LevelDBStore extends ServiceSuppor
def getMessageCount(clientId: String, subscriptionName: String): Int = {
lookup(clientId, subscriptionName) match {
- case Some(sub) => (lastSeq.get - sub.lastAckPosition).toInt
+ case Some(sub) =>
+ (lastSeq.get - sub.lastAckPosition).toInt
case None => 0
}
}
Modified: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java Thu Oct 25 02:48:50 2012
@@ -27,15 +27,4 @@ public class LevelDBXARecoveryBrokerTest
broker.setPersistenceAdapter(store);
}
-
- @Override
- public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
-// XA Durable Subs not yet implemented
-// super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback();
- }
- @Override
- public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
-// XA Durable Subs not yet implemented
-// super.testTopicPersistentPreparedAcksAvailableAfterRollback();
- }
}