You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/25 01:08:37 UTC

svn commit: r1401911 - in /activemq/trunk/activemq-leveldb/src: main/scala/org/apache/activemq/leveldb/ test/scala/org/apache/activemq/leveldb/

Author: chirino
Date: Wed Oct 24 23:08:37 2012
New Revision: 1401911

URL: http://svn.apache.org/viewvc?rev=1401911&view=rev
Log:
Implementing AMQ-4134: Add XA support to the LevelDB store.

More test cases are now working.  Only the durable sub XA cases are broken at this point.

Modified:
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
    activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed Oct 24 23:08:37 2012
@@ -21,7 +21,7 @@ import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent._
 import atomic._
-import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtbuf.{DataByteArrayOutputStream, DataByteArrayInputStream, ByteArrayOutputStream, Buffer}
 import org.apache.activemq.store.MessageRecoveryListener
 import java.lang.ref.WeakReference
 import scala.Option._
@@ -41,7 +41,7 @@ case class QueueEntryRecord(id:MessageId
 case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
 case class QueueEntryRange()
 case class SubAckRecord(subKey:Long, ackPosition:Long)
-case class XaAckRecord(container:Long, seq:Long, ack:Buffer)
+case class XaAckRecord(container:Long, seq:Long, ack:MessageAck)
 
 sealed trait UowState {
   def stage:Int
@@ -130,7 +130,6 @@ class DelayableUOW(val manager:DBManager
   val uowId:Int = manager.lastUowId.incrementAndGet()
   var actions = Map[MessageId, MessageAction]()
   var subAcks = ListBuffer[SubAckRecord]()
-  var xaAcks = ListBuffer[XaAckRecord]()
   var completed = false
   var disableDelay = false
   var delayableActions = 0
@@ -147,25 +146,26 @@ class DelayableUOW(val manager:DBManager
   def syncNeeded = syncFlag || actions.find( _._2.syncNeeded ).isDefined
   def size = 100+actions.foldLeft(0L){ case (sum, entry) =>
     sum + (entry._2.size+100)
-  } + (subAcks.size * 100) + xaAcks.foldLeft(0L){ case (sum, entry) =>
-    sum + entry.ack.length
-  }
+  } + (subAcks.size * 100)
 
   class MessageAction {
     var id:MessageId = _
     var messageRecord: MessageRecord = null
     var enqueues = ListBuffer[QueueEntryRecord]()
     var dequeues = ListBuffer[QueueEntryRecord]()
+    var xaAcks = ListBuffer[XaAckRecord]()
 
     def uow = DelayableUOW.this
-    def isEmpty() = messageRecord==null && enqueues==Nil && dequeues==Nil
+    def isEmpty() = messageRecord==null && enqueues.isEmpty && dequeues.isEmpty && xaAcks.isEmpty
 
     def cancel() = {
       uow.rm(id)
     }
 
     def syncNeeded = messageRecord!=null && messageRecord.syncNeeded
-    def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50)
+    def size = (if(messageRecord!=null) messageRecord.data.length+20 else 0) + ((enqueues.size+dequeues.size)*50) + xaAcks.foldLeft(0L){ case (sum, entry) =>
+      sum + 100
+    }
     
     def addToPendingStore() = {
       var set = manager.pendingStores.get(id)
@@ -222,8 +222,10 @@ class DelayableUOW(val manager:DBManager
   }
 
   def xaAck(container:Long, seq:Long, ack:MessageAck) = {
-    var packet = manager.parent.wireFormat.marshal(ack)
-    xaAcks += XaAckRecord(container, seq, new Buffer(packet.data, packet.offset, packet.length))
+    this.synchronized {
+      getAction(ack.getLastMessageId).xaAcks+=(XaAckRecord(container, seq, ack))
+    }
+    countDownFuture
   }
 
   def enqueue(queueKey:Long, queueSeq:Long, message:Message, delay_enqueue:Boolean)  = {
@@ -641,12 +643,10 @@ class DBManager(val parent:LevelDBStore)
   def getXAActions(key:Long) = {
     val msgs = ListBuffer[Message]()
     val acks = ListBuffer[MessageAck]()
-    println("transactionCursor")
     client.transactionCursor(key) { command =>
-      println("recovered command: "+command)
       command match {
         case message:Message => msgs += message
-        case ack:MessageAck => acks += ack
+        case record:XaAckRecord => acks += record.ack
       }
       true
     }

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Wed Oct 24 23:08:37 2012
@@ -944,13 +944,21 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
 
-  def transactionCursor(collectionKey: Long)(func: (DataStructure)=>Boolean) = {
+  def transactionCursor(collectionKey: Long)(func: (AnyRef)=>Boolean) = {
     collectionCursor(collectionKey, encodeLong(0)) { (key, value) =>
       val seq = decodeLong(key)
       if( value.getMeta != null ) {
-        val data = value.getMeta
-        val ack = store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[MessageAck].asInstanceOf[MessageAck]
-        func(ack)
+
+        val is = new DataByteArrayInputStream(value.getMeta);
+        val log = is.readLong()
+        val offset = is.readInt()
+        val qid = is.readLong()
+        val seq = is.readLong()
+        val ack = store.wireFormat.unmarshal(is).asInstanceOf[MessageAck]
+        ack.getLastMessageId.setDataLocator((log, offset))
+        ack.getLastMessageId.setEntryLocator((qid, seq))
+
+        func(XaAckRecord(collectionKey, seq, ack))
       } else {
         var locator = (value.getValueLocation, value.getValueLength)
         val msg = getMessage(locator)
@@ -1068,7 +1076,7 @@ class LevelDBClient(store: LevelDBStore)
                   dataLocator = entry.id.getDataLocator match {
                     case x:(Long, Int) => x
                     case x:MessageRecord => x.locator
-                    case _ => throw new RuntimeException("Unexpected locator type")
+                    case _ => throw new RuntimeException("Unexpected locator type: "+dataLocator)
                   }
                 }
 
@@ -1126,18 +1134,37 @@ class LevelDBClient(store: LevelDBStore)
                 write_enqueue_total += System.nanoTime() - start
               }
 
-            }
+              action.xaAcks.foreach { entry =>
+                val ack = entry.ack
+                if( dataLocator==null ) {
+                  dataLocator = ack.getLastMessageId.getDataLocator match {
+                    case x:(Long, Int) => x
+                    case x:MessageRecord => x.locator
+                    case _ =>
+                      throw new RuntimeException("Unexpected locator type")
+                  }
+                }
+
+                val (qid, seq) = ack.getLastMessageId.getEntryLocator.asInstanceOf[(Long, Long)];
+                val os = new DataByteArrayOutputStream()
+                os.writeLong(dataLocator._1)
+                os.writeInt(dataLocator._2)
+                os.writeLong(qid)
+                os.writeLong(seq)
+                store.wireFormat.marshal(ack, os)
+                var ack_encoded = os.toBuffer
+
+                val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
+                val log_record = new EntryRecord.Bean()
+                log_record.setCollectionKey(entry.container)
+                log_record.setEntryKey(new Buffer(key, 9, 8))
+                log_record.setMeta(ack_encoded)
+                appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
+                val index_record = new EntryRecord.Bean()
+                index_record.setMeta(ack_encoded)
+                batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
+              }
 
-            uow.xaAcks.foreach { entry =>
-              val key = encodeEntryKey(ENTRY_PREFIX, entry.container, entry.seq)
-              val log_record = new EntryRecord.Bean()
-              log_record.setCollectionKey(entry.container)
-              log_record.setEntryKey(new Buffer(key, 9, 8))
-              log_record.setMeta(entry.ack)
-              appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
-              val index_record = new EntryRecord.Bean()
-              index_record.setValue(entry.ack)
-              batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray)
             }
 
             uow.subAcks.foreach { entry =>

Modified: activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala Wed Oct 24 23:08:37 2012
@@ -229,7 +229,9 @@ class LevelDBStore extends ServiceSuppor
       for ( ack <- acks ) {
         // think we might have store design issue /w XA transactions and durable sub acks.
         // does it even work for the other stores?
-        transaction.remove(createMessageStore(ack.getDestination), ack);
+        var store = createMessageStore(ack.getDestination)
+        store.preparedAcks.add(ack.getLastMessageId)
+        transaction.remove(store, ack);
       }
     }
     debug("started")
@@ -322,16 +324,25 @@ class LevelDBStore extends ServiceSuppor
 
     def remove(store:LevelDBStore#LevelDBMessageStore, ack:MessageAck) = {
       commitActions += new TransactionAction() {
+
         def commit(uow:DelayableUOW) = {
           store.doRemove(uow, ack.getLastMessageId)
+          if( prepared ) {
+            store.preparedAcks.remove(ack.getLastMessageId)
+          }
         }
+
         def prepare(uow:DelayableUOW) = {
           // add it to the xa container instead of the actual store container.
           uow.xaAck(xacontainer_id, xaseqcounter.incrementAndGet, ack)
           xarecovery._2 += ack
+          store.preparedAcks.add(ack.getLastMessageId)
         }
 
         def rollback(uow: DelayableUOW) {
+          if( prepared ) {
+            store.preparedAcks.remove(ack.getLastMessageId)
+          }
         }
       }
     }
@@ -380,6 +391,15 @@ class LevelDBStore extends ServiceSuppor
         println("The transaction does not exist")
       case Some(tx)=>
         if( tx.prepared ) {
+          val done = new CountDownLatch(1)
+          withUow { uow =>
+            for( action <- tx.commitActions ) {
+              action.rollback(uow)
+            }
+            uow.syncFlag = true
+            uow.addCompleteListener { done.countDown() }
+          }
+          done.await()
           db.removeTransactionContainer(tx.xacontainer_id)
         }
     }
@@ -394,12 +414,18 @@ class LevelDBStore extends ServiceSuppor
     }
   }
 
+  var doingRecover = false
   def recover(listener: TransactionRecoveryListener) = {
-    for ( (txid, transaction) <- transactions ) {
-      if( transaction.prepared ) {
-        val (msgs, acks) = transaction.xarecovery
-        listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
+    this.doingRecover = true
+    try {
+      for ( (txid, transaction) <- transactions ) {
+        if( transaction.prepared ) {
+          val (msgs, acks) = transaction.xarecovery
+          listener.recover(txid.asInstanceOf[XATransactionId], msgs.toArray, acks.toArray);
+        }
       }
+    } finally {
+      this.doingRecover = false
     }
   }
 
@@ -490,6 +516,7 @@ class LevelDBStore extends ServiceSuppor
 
     protected val lastSeq: AtomicLong = new AtomicLong(0)
     protected var cursorPosition: Long = 0
+    val preparedAcks = new HashSet[MessageId]()
 
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
@@ -556,7 +583,25 @@ class LevelDBStore extends ServiceSuppor
     }
 
     def recover(listener: MessageRecoveryListener): Unit = {
-      cursorPosition = db.cursorMessages(key, listener, 0)
+      cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
+    }
+
+    def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener {
+      def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
+      def hasSpace = listener.hasSpace
+      def recoverMessageReference(ref: MessageId) = {
+        if (!preparedAcks.contains(ref)) {
+          listener.recoverMessageReference(ref)
+        }
+        true
+      }
+
+      def recoverMessage(message: Message) = {
+        if (!preparedAcks.contains(message.getMessageId)) {
+          listener.recoverMessage(message)
+        }
+        true
+      }
     }
 
     def resetBatching: Unit = {
@@ -564,7 +609,7 @@ class LevelDBStore extends ServiceSuppor
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
-      cursorPosition = db.cursorMessages(key, LimitingRecoveryListener(maxReturned, listener), cursorPosition)
+      cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition)
     }
 
     override def setBatch(id: MessageId): Unit = {
@@ -697,7 +742,7 @@ class LevelDBStore extends ServiceSuppor
     
     def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
-        sub.cursorPosition = db.cursorMessages(key,  LimitingRecoveryListener(maxReturned, listener), sub.cursorPosition.max(sub.lastAckPosition+1))
+        sub.cursorPosition = db.cursorMessages(key,  preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
       }
     }
     

Modified: activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java?rev=1401911&r1=1401910&r2=1401911&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-leveldb/src/test/scala/org/apache/activemq/leveldb/LevelDBXARecoveryBrokerTest.java Wed Oct 24 23:08:37 2012
@@ -27,21 +27,15 @@ public class LevelDBXARecoveryBrokerTest
         broker.setPersistenceAdapter(store);
     }
 
-    // TODO: The following test cases are failing...
-
-    @Override
-    public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
-    }
-
-    @Override
-    public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
-    }
 
     @Override
     public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
+//        XA Durable Subs not yet implemented
+//        super.testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback();
     }
-
     @Override
     public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
+//        XA Durable Subs not yet implemented
+//        super.testTopicPersistentPreparedAcksAvailableAfterRollback();
     }
 }