You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/25 04:48:50 UTC

svn commit: r1401956 - in /activemq/trunk/activemq-leveldb/src: main/scala/org/apache/activemq/leveldb/ test/scala/org/apache/activemq/leveldb/

Author: chirino
Date: Thu Oct 25 02:48:50 2012
New Revision: 1401956

URL: http://svn.apache.org/viewvc?rev=1401956&view=rev
Log:
Implementing AMQ-4134: Add XA support to the LevelDB store.

All XA tests are now working.

Modified:
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Thu Oct 25 02:48:50 2012
@@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId
 case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
 case class QueueEntryRange()
 case class SubAckRecord(subKey:Long, ackPosition:Long)
-case class XaAckRecord(container:Long, seq:Long, ack:MessageAck)
+case class XaAckRecord(container:Long, seq:Long, ack:MessageAck, sub:Long = -1)
 
 sealed trait UowState {
   def stage:Int
@@ -217,13 +217,13 @@ class DelayableUOW(val manager:DBManager
     }
   }
 
-  def updateAckPosition(sub:DurableSubscription) = {
-    subAcks += SubAckRecord(sub.subKey, sub.lastAckPosition)
+  def updateAckPosition(sub_key:Long, ack_seq:Long) = {
+    subAcks += SubAckRecord(sub_key, ack_seq)
   }
 
-  def xaAck(container:Long, seq:Long, ack:MessageAck) = {
+  def xaAck(record:XaAckRecord) = {
     this.synchronized {
-      getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack))
+      getAction(record.ack.getLastMessageId).xaAcks+=record
     }
     countDownFuture
   }
@@ -642,11 +642,11 @@ class DBManager(val parent:LevelDBStore)
 
   def getXAActions(key:Long) = {
     val msgs = ListBuffer[Message]()
-    val acks = ListBuffer[MessageAck]()
+    val acks = ListBuffer[XaAckRecord]()
     client.transactionCursor(key) { command =>
       command match {
         case message:Message => msgs += message
-        case record:XaAckRecord => acks += record.ack
+        case record:XaAckRecord => acks += record
       }
       true
     }

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Thu Oct 25 02:48:50 2012
@@ -954,11 +954,12 @@ class LevelDBClient(store: LevelDBStore)
         val offset = is.readInt()
         val qid = is.readLong()
         val seq = is.readLong()
+        val sub = is.readLong()
         val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
         ack.getLastMessageId.setDataLocator((log, offset))
         ack.getLastMessageId.setEntryLocator((qid, seq))
 
-        func(XaAckRecord(collectionKey, seq, ack))
+        func(XaAckRecord(collectionKey, seq, ack, sub))
       } else {
         var locator = (value.getValueLocation, value.getValueLength)
         val msg = getMessage(locator)
@@ -1134,7 +1135,7 @@ class LevelDBClient(store: LevelDBStore)
                 write_enqueue_total += System.nanoTime() - start
               }
 
-              action.xaAcks.foreach { entry =>
+              action.xaAcks.foreach { entry:XaAckRecord =>
                 val ack = entry.ack
                 if( dataLocator==null ) {
                   dataLocator = ack.getLastMessageId.getDataLocator match {
@@ -1144,6 +1145,7 @@ class LevelDBClient(store: LevelDBStore)
                       throw new RuntimeException("Unexpected locator type")
                   }
                 }
+                println(dataLocator)
 
                 val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
                 val os = new DataByteArrayOutputStream()
@@ -1151,6 +1153,7 @@ class LevelDBClient(store: LevelDBStore)
                 os.writeInt(dataLocator._2)
                 os.writeLong(qid)
                 os.writeLong(seq)
+                os.writeLong(entry.sub)
                 store.wireFormat.marshal(ack, os)
                 var ack_encoded = os.toBuffer
 

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Thu Oct 25 02:48:50 2012
@@ -222,16 +222,24 @@ class LevelDBStore extends ServiceSuppor
     for( (txid, transaction) <- transactions ) {
       assert( transaction.xacontainer_id != -1 )
       val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
-      transaction.xarecovery = (msgs, acks)
+      transaction.xarecovery = (msgs, acks.map(_.ack))
       for ( msg <- msgs ) {
         transaction.add(createMessageStore(msg.getDestination), msg, false);
       }
-      for ( ack <- acks ) {
-        // think we might have store design issue /w XA transactions and durable sub acks.
-        // does it even work for the other stores?
+      for ( record <- acks ) {
+        var ack = record.ack
         var store = createMessageStore(ack.getDestination)
-        store.preparedAcks.add(ack.getLastMessageId)
-        transaction.remove(store, ack);
+        if( record.sub == -1 ) {
+          store.preparedAcks.add(ack.getLastMessageId)
+          transaction.remove(store, ack);
+        } else {
+          val topic = store.asInstanceOf[LevelDBTopicMessageStore];
+          for ( sub <- topic.subscription_with_key(record.sub) ) {
+            val position = db.queuePosition(ack.getLastMessageId)
+            transaction.updateAckPosition( topic, sub, position, ack);
+            sub.lastAckPosition = position
+          }
+        }
       }
     }
     debug("started")
@@ -334,7 +342,7 @@ class LevelDBStore extends ServiceSuppor
 
         def prepare(uow:DelayableUOW) = {
           // add it to the xa container instead of the actual store container.
-          uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
+          uow.xaAck(XaAckRecord(xacontainer_id, xaseqcounter.incrementAndGet, ack))
           xarecovery._2 += ack
           store.preparedAcks.add(ack.getLastMessageId)
         }
@@ -347,14 +355,22 @@ class LevelDBStore extends ServiceSuppor
       }
     }
 
-    def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long) = {
+    def updateAckPosition(store:LevelDBStore#LevelDBTopicMessageStore, sub: DurableSubscription, position: Long, ack:MessageAck) = {
       commitActions += new TransactionAction() {
+        var prev_position = sub.lastAckPosition
+
         def commit(uow:DelayableUOW) = {
           store.doUpdateAckPosition(uow, sub, position)
         }
         def prepare(uow:DelayableUOW) = {
+          prev_position = sub.lastAckPosition
+          sub.lastAckPosition = position
+          uow.xaAck(XaAckRecord(xacontainer_id, xaseqcounter.incrementAndGet, ack, sub.subKey))
         }
         def rollback(uow: DelayableUOW) {
+          if ( prepared ) {
+            sub.lastAckPosition = prev_position
+          }
         }
       }
     }
@@ -393,7 +409,7 @@ class LevelDBStore extends ServiceSuppor
         if( tx.prepared ) {
           val done = new CountDownLatch(1)
           withUow { uow =>
-            for( action <- tx.commitActions ) {
+            for( action <- tx.commitActions.reverse ) {
               action.rollback(uow)
             }
             uow.syncFlag = true
@@ -661,6 +677,12 @@ class LevelDBStore extends ServiceSuppor
     val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]()
     var firstSeq = 0L
 
+    def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2)
+
+    override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
+      super.asyncAddQueueMessage(context, message, false)
+    }
+
     def gcPosition:Option[(Long, Long)] = {
       var pos = lastSeq.get()
       subscriptions.synchronized {
@@ -685,7 +707,7 @@ class LevelDBStore extends ServiceSuppor
       }
       sub.lastAckPosition = if (retroactive) 0 else lastSeq.get()
       waitOn(withUow{ uow=>
-        uow.updateAckPosition(sub)
+        uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
         uow.countDownFuture
       })
     }
@@ -710,14 +732,14 @@ class LevelDBStore extends ServiceSuppor
 
     def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long) = {
       sub.lastAckPosition = position
-      uow.updateAckPosition(sub)
+      uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
     }
 
     def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
         var position = db.queuePosition(messageId)
         if(  ack.getTransactionId!=null ) {
-          transaction(ack.getTransactionId).updateAckPosition(this, sub, position)
+          transaction(ack.getTransactionId).updateAckPosition(this, sub, position, ack)
           DONE
         } else {
           waitOn(withUow{ uow=>
@@ -748,7 +770,8 @@ class LevelDBStore extends ServiceSuppor
     
     def getMessageCount(clientId: String, subscriptionName: String): Int = {
       lookup(clientId, subscriptionName) match {
-        case Some(sub) => (lastSeq.get - sub.lastAckPosition).toInt
+        case Some(sub) =>
+          (lastSeq.get - sub.lastAckPosition).toInt
         case None => 0
       }
     }

Modified: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java?rev=1401956&r1=1401955&r2=1401956&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java Thu Oct 25 02:48:50 2012
@@ -27,15 +27,4 @@ public class LevelDBXARecoveryBrokerTest
         broker.setPersistenceAdapter(store);
     }
 
-
-    @Override
-    public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
-//        XA Durable Subs not yet implemented
-//        super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback();
-    }
-    @Override
-    public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
-//        XA Durable Subs not yet implemented
-//        super.testTopicPersistentPreparedAcksAvailableAfterRollback();
-    }
 }