You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:48 UTC

svn commit: r961128 [1/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-dto/src/main/java/org/apache/activemq/apollo/dto/ activemq-hawtdb/src/main/proto/ activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/ activemq-hawtdb/s...

Author: chirino
Date: Wed Jul  7 04:06:47 2010
New Revision: 961128

URL: http://svn.apache.org/viewvc?rev=961128&view=rev
Log:
making more progress on the hawtdb-store

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerTest.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Codecs.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Callback.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DestinationEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/DuplicateKeyException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/FatalStoreException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/HawtDBStoreDTO.java Wed Jul  7 04:06:47 2010
@@ -32,8 +32,8 @@ public class HawtDBStoreDTO extends Stor
     @XmlAttribute(name="directory", required=false)
     public File directory;
 
-	@XmlAttribute(name="checkpoint-interval", required=false)
-	public long checkpointInterval = 5 * 1000L;
+	@XmlAttribute(name="index-flush-interval", required=false)
+	public long indexFlushInterval = 5 * 1000L;
 
 	@XmlAttribute(name="cleanup-interval", required=false)
 	public long cleanupInterval = 30 * 1000L;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul  7 04:06:47 2010
@@ -154,18 +154,27 @@ message RemoveStream {
 ///////////////////////////////////////////////////////////////
 // Index Structures
 ///////////////////////////////////////////////////////////////
-message RootRecord {
+message DatabaseRootRecord {
 
   required fixed32 state=1;
   required fixed64 lastMessageKey=2;
   required fixed64 firstInProgressBatch=3;
   required fixed64 lastUpdateLocation=4;
 
-  required fixed32 locationIndexPage=5;
+  required fixed32 dataFileRefIndexPage=5;
   required fixed32 messageKeyIndexPage=6;
   required fixed32 messageRefsIndexPage=7;
-  required fixed32 destinationIndexPage=8;
+  required fixed32 queueIndexPage=8;
   required fixed32 subscriptionIndexPage=10;
   required fixed32 mapIndexPage=11;
   
-}
\ No newline at end of file
+}
+
+message QueueRootRecord {
+  required AddQueue info=1;
+  required int64 size=2;
+  required int64 count=3;
+  required fixed32 entryIndexPage=4;
+  required fixed32 trackingIndexPage=5;
+}
+

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:06:47 2010
@@ -26,9 +26,6 @@ import org.apache.activemq.apollo.store.
 import org.fusesource.hawtbuf.proto.MessageBuffer
 import org.fusesource.hawtbuf.proto.PBMessage
 import org.apache.activemq.util.LockFile
-import org.fusesource.hawtdb.api.{Transaction, TxPageFileFactory}
-import java.util.HashSet
-import collection.mutable.{HashMap, ListBuffer}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import org.fusesource.hawtdb.internal.journal.{JournalCallback, Journal, Location}
 import org.fusesource.hawtdispatch.TaskTracker
@@ -39,84 +36,52 @@ import org.apache.activemq.broker.store.
 import org.fusesource.hawtbuf._
 import org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.apollo.broker.{Log, Logging, BaseService}
+import collection.mutable.{LinkedHashMap, HashMap, ListBuffer}
+import collection.JavaConversions
+import java.util.{TreeSet, HashSet}
+import org.fusesource.hawtdb.api._
 
 object HawtDBClient extends Log {
-  
-  type PB = PBMessage[_ <: PBMessage[_,_], _ <: MessageBuffer[_,_]]
-
-  implicit def toPBMessage(value:TypeCreatable):PB = value.asInstanceOf[PB]
-
   val BEGIN = -1
   val COMMIT = -2
+  val ROLLBACK = -3
 
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000
 
   val CLOSED_STATE = 1
   val OPEN_STATE = 2
-
-  implicit def decodeMessageRecord(pb: AddMessage.Getter): MessageRecord = {
-    val rc = new MessageRecord
-    rc.protocol = pb.getProtocol
-    rc.size = pb.getSize
-    rc.value = pb.getValue
-    rc.stream = pb.getStreamKey
-    rc.expiration = pb.getExpiration
-    rc
-  }
-
-  implicit def encodeMessageRecord(v: MessageRecord): AddMessage.Bean = {
-    val pb = new AddMessage.Bean
-    pb.setProtocol(v.protocol)
-    pb.setSize(v.size)
-    pb.setValue(v.value)
-    pb.setStreamKey(v.stream)
-    pb.setExpiration(v.expiration)
-    pb
-  }
-
-  implicit def decodeQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
-    val rc = new QueueEntryRecord
-    rc.messageKey = pb.getMessageKey
-    rc.attachment = pb.getAttachment
-    rc.size = pb.getSize
-    rc.redeliveries = pb.getRedeliveries.toShort
-    rc
-  }
-
-  implicit def encodeQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
-    val pb = new AddQueueEntry.Bean
-    pb.setMessageKey(v.messageKey)
-    pb.setAttachment(v.attachment)
-    pb.setSize(v.size)
-    pb.setRedeliveries(v.redeliveries)
-    pb
-  }
 }
 
 /**
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBClient() extends Logging {
+class HawtDBClient(hawtDBStore: HawtDBStore) extends Logging {
   import HawtDBClient._
+  import Helpers._
 
   override def log: Log = HawtDBClient
 
-  val dispatchQueue = createQueue("hawtdb store")
+  def dispatchQueue = hawtDBStore.dispatchQueue
 
 
   private val pageFileFactory = new TxPageFileFactory()
   private var journal: Journal = null
 
   private var lockFile: LockFile = null
-  private var nextRecoveryPosition: Location = null
-  private var lastRecoveryPosition: Location = null
   private val trackingGen = new AtomicLong(0)
+  private val lockedDatatFiles = new HashSet[java.lang.Integer]()
 
-  private val journalFilesBeingReplicated = new HashSet[Integer]()
   private var recovering = false
+  private var nextRecoveryPosition: Location = null
+  private var lastRecoveryPosition: Location = null
+  private var recoveryCounter = 0
 
-  //protected RootEntity rootEntity = new RootEntity()
+  var databaseRootRecord = new DatabaseRootRecord.Bean
+
+
+  val next_batch_counter = new AtomicInteger(0)
+  private var batches = new LinkedHashMap[Int, (Location, ListBuffer[Update])]()
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -128,7 +93,7 @@ class HawtDBClient() extends Logging {
 
   private def journalMaxFileLength = config.journalLogSize
 
-  private def checkpointInterval = config.checkpointInterval
+  private def checkpointInterval = config.indexFlushInterval
 
   private def cleanupInterval = config.cleanupInterval
 
@@ -139,7 +104,7 @@ class HawtDBClient() extends Logging {
 
   /////////////////////////////////////////////////////////////////////
   //
-  // Public interface
+  // Public interface used by the HawtDBStore
   //
   /////////////////////////////////////////////////////////////////////
 
@@ -168,12 +133,17 @@ class HawtDBClient() extends Logging {
     }
   }
 
+  def createJournal() = {
+    val journal = new Journal()
+    journal.setDirectory(directory)
+    journal.setMaxFileLength(config.journalLogSize)
+    journal
+  }
+
   def start() = {
     lock {
 
-      journal = new Journal()
-      journal.setDirectory(directory)
-      journal.setMaxFileLength(config.journalLogSize)
+      journal = createJournal()
       journal.start
 
       pageFileFactory.setFile(new File(directory, "db"))
@@ -182,14 +152,30 @@ class HawtDBClient() extends Logging {
       pageFileFactory.setUseWorkerThread(true)
       pageFileFactory.open()
 
-      withTx {tx =>
-        if (!tx.allocator().isAllocated(0)) {
-          //            rootEntity.allocate(tx)
-        }
-        //        rootEntity.load(tx)
+      withTx { tx =>
+          val helper = new TxHelper(tx)
+          import helper._
+
+          if (!tx.allocator().isAllocated(0)) {
+            val rootPage = tx.alloc()
+            assert(rootPage == 0)
+
+            databaseRootRecord.setQueueIndexPage(alloc(QUEUE_INDEX_FACTORY))
+            databaseRootRecord.setMessageKeyIndexPage(alloc(MESSAGE_KEY_INDEX_FACTORY))
+            databaseRootRecord.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
+            databaseRootRecord.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
+            databaseRootRecord.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+
+            tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
+            databaseRootRecord = databaseRootRecord.copy
+          } else {
+            databaseRootRecord = tx.get(DATABASE_ROOT_RECORD_ACCESSOR, 0).copy
+          }
       }
+
       pageFile.flush()
-      //        recover()
+      recover
+
       //        trackingGen.set(rootEntity.getLastMessageTracking() + 1)
 
       //      checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
@@ -227,7 +213,6 @@ class HawtDBClient() extends Logging {
   def stop() = {
   }
 
-
   def addQueue(record: QueueRecord) = {
     val update = new AddQueue.Bean()
     update.setKey(record.key)
@@ -236,177 +221,160 @@ class HawtDBClient() extends Logging {
     store(update)
   }
 
+  def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
+    var batch = List[TypeCreatable]()
+    txs.foreach {
+      tx =>
+        tx.actions.foreach {
+          case (msg, action) =>
+            if (action.store != null) {
+              val update: AddMessage.Bean = action.store
+              batch ::= update
+            }
+            action.enqueues.foreach {
+              queueEntry =>
+                val update: AddQueueEntry.Bean = queueEntry
+                batch ::= update
+            }
+            action.dequeues.foreach {
+              queueEntry =>
+                val qid = queueEntry.queueKey
+                val seq = queueEntry.queueSeq
+                batch ::= new RemoveQueueEntry.Bean().setQueueKey(qid).setQueueSeq(seq)
+            }
+        }
+    }
+    store(batch)
+  }
+
   def purge() = {
-//    withSession {
-//      session =>
-//        session.list(schema.queue_name).map {
-//          x =>
-//            val qid: Long = x.name
-//            session.remove(schema.entries \ qid)
-//        }
-//        session.remove(schema.queue_name)
-//        session.remove(schema.message_data)
-//    }
+    val update = new Purge.Bean()
+    store(update)
   }
 
   def listQueues: Seq[Long] = {
-    null
-//    withSession {
-//      session =>
-//        session.list(schema.queue_name).map {
-//          x =>
-//            val id: Long = x.name
-//            id
-//        }
-//    }
-  }
-
-  def getQueueStatus(id: Long): Option[QueueStatus] = {
-    null
-//    withSession {
-//      session =>
-//        session.get(schema.queue_name \ id) match {
-//          case Some(x) =>
-//
-//            val rc = new QueueStatus
-//            rc.record = new QueueRecord
-//            rc.record.key = id
-//            rc.record.name = new AsciiBuffer(x.value)
-//
-//            //            rc.count = session.count( schema.entries \ id )
-//
-//            // TODO
-//            //          rc.count =
-//            //          rc.first =
-//            //          rc.last =
-//
-//            Some(rc)
-//          case None =>
-//            None
-//        }
-//    }
+    withTx { tx =>
+        val helper = new TxHelper(tx)
+        import JavaConversions._
+        import helper._
+        queueIndex.iterator.map {
+          entry =>
+            entry.getKey.longValue
+        }.toSeq
+    }
   }
 
+  def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
+    withTx { tx =>
+        val helper = new TxHelper(tx)
+        import JavaConversions._
+        import helper._
+
+        val queueRecord = queueIndex.get(queueKey)
+        if (queueRecord != null) {
+          val rc = new QueueStatus
+          rc.record = new QueueRecord
+          rc.record.key = queueKey
+          rc.record.name = queueRecord.getInfo.getName
+          rc.record.queueType = queueRecord.getInfo.getQueueType
+          rc.count = queueRecord.getCount.toInt
+          rc.size = queueRecord.getSize
+
+          // TODO
+          // rc.first =
+          // rc.last =
+
+          Some(rc)
+        } else {
+          None
+        }
+    }
+  }
 
-  def store(txs: Seq[HawtDBStore#HawtDBBatch]) {
-//    withSession {
-//      session =>
-//              var operations = List[Operation]()
-//              txs.foreach {
-//                tx =>
-//                  tx.actions.foreach {
-//                    case (msg, action) =>
-//                      var rc =
-//                      if (action.store != null) {
-//                        operations ::= Insert( schema.message_data \ (msg, action.store) )
-//                      }
-//                      action.enqueues.foreach {
-//                        queueEntry =>
-//                          val qid = queueEntry.queueKey
-//                          val seq = queueEntry.queueSeq
-//                          operations ::= Insert( schema.entries \ qid \ (seq, queueEntry) )
-//                      }
-//                      action.dequeues.foreach {
-//                        queueEntry =>
-//                          val qid = queueEntry.queueKey
-//                          val seq = queueEntry.queueSeq
-//                          operations ::= Delete( schema.entries \ qid, ColumnPredicate(seq :: Nil) )
-//                      }
-//                  }
-//              }
-//              session.batch(operations)
-//    }
-  }
-
-  def loadMessage(id: Long): Option[MessageRecord] = {
-    null
-//    withSession {
-//      session =>
-//        session.get(schema.message_data \ id) match {
-//          case Some(x) =>
-//            val rc: MessageRecord = x.value
-//            rc.key = id
-//            Some(rc)
-//          case None =>
-//            None
-//        }
-//    }
-  }
-
-  def getQueueEntries(qid: Long): Seq[QueueEntryRecord] = {
-    null
-//    withSession {
-//      session =>
-//        session.list(schema.entries \ qid).map {
-//          x =>
-//            val rc: QueueEntryRecord = x.value
-//            rc.queueKey = qid
-//            rc.queueSeq = x.name
-//            rc
-//        }
-//    }
+
+  def getQueueEntries(queueKey: Long): Seq[QueueEntryRecord] = {
+    withTx { tx =>
+        val helper = new TxHelper(tx)
+        import JavaConversions._
+        import helper._
+
+        val queueRecord = queueIndex.get(queueKey)
+        if (queueRecord != null) {
+          val entryIndex = queueEntryIndex(queueRecord)
+          entryIndex.iterator.map {
+            entry =>
+              val rc: QueueEntryRecord = entry.getValue
+              rc
+          }.toSeq
+        } else {
+          Nil.toSeq
+        }
+    }
   }
 
+  def loadMessage(messageKey: Long): Option[MessageRecord] = {
+    withTx { tx =>
+        val helper = new TxHelper(tx)
+        import JavaConversions._
+        import helper._
+
+        val location = messageKeyIndex.get(messageKey)
+        if (location != null) {
+          load(location, classOf[AddMessage.Getter]) match {
+            case Some(x) =>
+              val messageRecord: MessageRecord = x
+              Some(messageRecord)
+            case None => None
+          }
+        } else {
+          None
+        }
+    }
+  }
+
+
   /////////////////////////////////////////////////////////////////////
   //
-  // Implementation
+  // Batch/Transactional interface to storing/accessing journaled updates.
   //
   /////////////////////////////////////////////////////////////////////
 
-  private def withTx[T](func: (Transaction) => T) {
-    val tx = pageFile.tx
-    var ok = false
+  private def load[T <: TypeCreatable](location: Location, expected: Class[T]): Option[T] = {
     try {
-      val rc = func(tx)
-      ok = true
-      rc
-    } finally {
-      if (ok) {
-        tx.commit
-      } else {
-        tx.rollback
+      read(location) match {
+        case (updateType, data) =>
+          Some(expected.cast(decode(location, updateType, data)))
       }
+    } catch {
+      case e: Exception =>
+        debug("Could not load journal record at: %s", location)
+        None
     }
   }
 
-  val next_batch_counter = new AtomicInteger(0)
-
-  // Gets the next batch id.. after a while we may wrap around
-  // start producing batch ids from zero
-  val next_batch_id = {
-    var rc = next_batch_counter.getAndIncrement
-    while (rc < 0) {
-      // We just wrapped around.. reset the counter to 0
-      // Use a CAS operation so that only 1 thread resets the counter
-      next_batch_counter.compareAndSet(rc + 1, 0)
-      rc = next_batch_counter.getAndIncrement
-    }
-    rc
-  }
-
-
-  private def store(updates: List[TypeCreatable]):Unit = {
+  private def store(updates: List[TypeCreatable]): Unit = {
     val tracker = new TaskTracker("storing")
-    store( updates, tracker.task(updates))
+    store(updates, tracker.task(updates))
     tracker.await
   }
 
-  private def store(update: TypeCreatable):Unit = {
+  private def store(update: TypeCreatable): Unit = {
     val tracker = new TaskTracker("storing")
-    store( update, tracker.task(update))
+    store(update, tracker.task(update))
     tracker.await
   }
 
-  private def store(updates: List[TypeCreatable], onComplete: Runnable):Unit = {
+  private def store(updates: List[TypeCreatable], onComplete: Runnable): Unit = {
     val batch = next_batch_id
     begin(batch)
-    updates.foreach {update =>
-      store(batch, update, null)
+    updates.foreach {
+      update =>
+        store(batch, update, null)
     }
     commit(batch, onComplete)
   }
 
-  private def store(update: TypeCreatable, onComplete: Runnable):Unit = store(-1, update, onComplete)
+  private def store(update: TypeCreatable, onComplete: Runnable): Unit = store(-1, update, onComplete)
 
   /**
    * All updated are are funneled through this method. The updates are logged to
@@ -415,7 +383,7 @@ class HawtDBClient() extends Logging {
    *
    * @throws IOException
    */
-  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable):Unit = {
+  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable): Unit = {
     val kind = update.asInstanceOf[TypeCreatable]
     val frozen = update.freeze
     val baos = new DataByteArrayOutputStream(frozen.serializedSizeUnframed + 1)
@@ -423,191 +391,615 @@ class HawtDBClient() extends Logging {
     baos.writeInt(batch)
     frozen.writeUnframed(baos)
 
-    journal(baos.toBuffer()) {location =>
-      store(batch, update, onComplete, location)
+    append(baos.toBuffer()) {
+      location =>
+        executeStore(batch, update, onComplete, location)
     }
   }
 
-
   /**
    */
-  private def begin(batch: Int):Unit = {
+  private def begin(batch: Int): Unit = {
     val baos = new DataByteArrayOutputStream(5)
     baos.writeByte(BEGIN)
     baos.writeInt(batch)
-    journal(baos.toBuffer) {location =>
-      begin(batch, location)
+    append(baos.toBuffer) {
+      location =>
+        executeBegin(batch, location)
     }
   }
 
   /**
    */
-  private def commit(batch: Int, onComplete: Runnable):Unit = {
+  private def commit(batch: Int, onComplete: Runnable): Unit = {
     val baos = new DataByteArrayOutputStream(5)
     baos.writeByte(COMMIT)
     baos.writeInt(batch)
-    journal(baos.toBuffer) {location =>
-      commit(batch, onComplete, location)
+    append(baos.toBuffer) {
+      location =>
+        executeCommit(batch, onComplete, location)
     }
   }
 
-  private def journal(data: Buffer)(cb: (Location) => Unit):Unit = {
-    val start = System.currentTimeMillis()
-    try {
-      journal.write(data, new JournalCallback() {
-        def success(location: Location) = {
-          cb(location)
-        }
-      })
-    } finally {
-      val end = System.currentTimeMillis()
-      if (end - start > 1000) {
-        warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
-      }
+  private def rollback(batch: Int, onComplete: Runnable): Unit = {
+    val baos = new DataByteArrayOutputStream(5)
+    baos.writeByte(ROLLBACK)
+    baos.writeInt(batch)
+    append(baos.toBuffer) {
+      location =>
+        executeRollback(batch, onComplete, location)
     }
   }
 
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods related to recovery
+  //
+  /////////////////////////////////////////////////////////////////////
 
   /**
-   * Move all the messages that were in the journal into long term storage. We
-   * just replay and do a checkpoint.
+   * Move all the messages that were in the journal into the indexes.
    *
    * @throws IOException
    * @throws IOException
    * @throws IllegalStateException
    */
-  def recover = {
+  def recover: Unit = {
+    recoveryCounter = 0
+    lastRecoveryPosition = null
+    val start = System.currentTimeMillis()
+    incrementalRecover
+
+    store(new AddTrace.Bean().setMessage("RECOVERED"), ^ {
+      // Rollback any batches that did not complete.
+      batches.keysIterator.foreach {
+        batch =>
+          rollback(batch, null)
+      }
+    })
+
+    val end = System.currentTimeMillis()
+    info("Processed %d operations from the journal in %,.3f seconds.", recoveryCounter, ((end - start) / 1000.0f))
+  }
+
+
+  /**
+   * incrementally recovers the journal.  It can be run again and again
+   * if the journal is being appended to.
+   */
+  def incrementalRecover(): Unit = {
+
+    // Is this our first incremental recovery pass?
+    if (lastRecoveryPosition == null) {
+      if (databaseRootRecord.hasFirstInProgressBatch) {
+        // we have to start at the first in progress batch usually...
+        nextRecoveryPosition = databaseRootRecord.getFirstInProgressBatch
+      } else {
+        // but perhaps there were no batches in progress..
+        if (databaseRootRecord.hasLastUpdateLocation) {
+          // then we can just continue from the last update applied to the index
+          nextRecoveryPosition = journal.getNextLocation(databaseRootRecord.getLastUpdateLocation)
+        } else {
+          // no updates in the index?.. start from the first record in the journal.
+          nextRecoveryPosition = journal.getNextLocation(null)
+        }
+      }
+    } else {
+      nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
+    }
+
     try {
-      val start = System.currentTimeMillis()
       recovering = true
-      var location = getRecoveryPosition()
-      if (location != null) {
-        var counter = 0
-        var uow: Transaction = null
-        val uowCounter = 0
-        while (location != null) {
-          import BufferEditor.BIG_ENDIAN._
-
-          var data = journal.read(location)
-          val updateType = readByte(data)
-          val batch = readInt(data)
-          updateType match {
-            case BEGIN => begin(batch, location)
-            case COMMIT => commit(batch, null, location)
-            case _ =>
-              val update = decode(location, updateType, data)
-              store(batch, update, null, location)
-          }
 
-          counter += 1
-          location = journal.getNextLocation(location)
-        }
-        val end = System.currentTimeMillis()
-        info("Processed %d operations from the journal in %,.3f seconds.", counter, ((end - start) / 1000.0f))
+      // Continue recovering until journal runs out of records.
+      while (nextRecoveryPosition != null) {
+        lastRecoveryPosition = nextRecoveryPosition
+        recover(lastRecoveryPosition)
+        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
       }
 
-      // We may have to undo some index updates.
-//      withTx {tx =>
-//        recoverIndex(tx)
-//      }
     } finally {
       recovering = false
     }
   }
 
-  def decode(location:Location, updateType:Int, value:Buffer) = {
-      val t = Type.valueOf(updateType);
-      if (t == null) {
-          throw new IOException("Could not load journal record. Invalid type at location: " + location);
-      }
-      t.parseUnframed(value).asInstanceOf[TypeCreatable]
+  /**
+   * Recovers the logged record at the specified location.
+   */
+  def recover(location: Location): Unit = {
+    var data = journal.read(location)
+
+    val editor = data.bigEndianEditor
+    val updateType = editor.readByte()
+    val batch = editor.readInt()
+
+    updateType match {
+      case BEGIN => executeBegin(batch, location)
+      case COMMIT => executeCommit(batch, null, location)
+      case _ =>
+        val update = decode(location, updateType, data)
+        executeStore(batch, update, null, location)
+    }
+
+    recoveryCounter += 1
+    databaseRootRecord.setLastUpdateLocation(location)
   }
 
 
-//  def incrementalRecover() = {
-//    try {
-//      recovering = true
-//      if (nextRecoveryPosition == null) {
-//        if (lastRecoveryPosition == null) {
-//          nextRecoveryPosition = getRecoveryPosition()
-//        } else {
-//          nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-//        }
-//      }
-//      while (nextRecoveryPosition != null) {
-//        lastRecoveryPosition = nextRecoveryPosition
-//        rootEntity.setLastUpdate(lastRecoveryPosition)
-//        val message = load(lastRecoveryPosition)
-//        val location = lastRecoveryPosition
-//
-//        withTx {tx =>
-//          updateIndex(tx, message.toType(), (MessageBuffer) message, location)
-//        }
-//        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition)
-//      }
-//    } finally {
-//      recovering = false
-//    }
-//  }
-
-
-  def getRecoveryPosition(): Location = {
-//    if (rootEntity.getLastUpdate() != null) {
-//      // Start replay at the record after the last one recorded in the
-//      // index file.
-//      return journal.getNextLocation(rootEntity.getLastUpdate());
-//    }
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods for Journal access
+  //
+  /////////////////////////////////////////////////////////////////////
 
-    // This loads the first position.
-    return journal.getNextLocation(null);
+  private def append(data: Buffer)(cb: (Location) => Unit): Unit = {
+    val start = System.currentTimeMillis()
+    try {
+      journal.write(data, new JournalCallback() {
+        def success(location: Location) = {
+          cb(location)
+        }
+      })
+    } finally {
+      val end = System.currentTimeMillis()
+      if (end - start > 1000) {
+        warn("KahaDB long enqueue time: Journal add took: " + (end - start) + " ms")
+      }
+    }
   }
 
+  def read(location: Location) = {
+    var data = journal.read(location)
+    val editor = data.bigEndianEditor
+    val updateType = editor.readByte()
+    (updateType, data)
+  }
 
-  private var batches = new HashMap[Int, ListBuffer[Update]]()
-  private case class Update(update: TypeCreatable, location: Location)
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods that execute updates stored in the journal by indexing them
+  // Used both in normal operation and durring recovery.
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  private def executeBegin(batch: Int, location: Location): Unit = {
+    assert(batches.get(batch).isEmpty)
+    batches.put(batch, (location, ListBuffer()))
+  }
+
+  private def executeCommit(batch: Int, onComplete: Runnable, location: Location): Unit = {
+    // apply all the updates in the batch as a single unit of work.
+    batches.remove(batch) match {
+      case Some((_, updates)) =>
+        // When recovering.. we only want to redo updates that committed
+        // after the last update location.
+        if (!recovering || isAfterLastUpdateLocation(location)) {
+          withTx { tx =>
+            // index the updates
+              updates.foreach {
+                update =>
+                  index(tx, update.update, update.location)
+              }
+              updateLocations(tx, location)
+          }
+        }
+      case None =>
+        // when recovering..  we are more lax due recovery starting
+        // in the middle of a stream of in progress batches
+        assert(!recovering)
+    }
+    if (onComplete != null) {
+      onComplete.run
+    }
+  }
+
+  private def executeRollback(batch: Int, onComplete: Runnable, location: Location): Unit = {
+    // apply all the updates in the batch as a single unit of work.
+    batches.remove(batch) match {
+      case Some((_, _)) =>
+        if (!recovering || isAfterLastUpdateLocation(location)) {
+          withTx { tx =>
+              updateLocations(tx, location)
+          }
+        }
+      case None =>
+        // when recovering..  we are more lax due recovery starting
+        // in the middle of a stream of in progress batches
+        assert(!recovering)
+    }
+    if (onComplete != null) {
+      onComplete.run
+    }
+  }
 
-  private def store(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
+  private def executeStore(batch: Int, update: TypeCreatable, onComplete: Runnable, location: Location): Unit = {
     if (batch == -1) {
-      // update is not part of the batch.. apply it now.
-      withTx {tx =>
-        store(tx, update, location)
+      // update is not part of the batch..
+
+      // When recovering.. we only want to redo updates that happen
+      // after the last update location.
+      if (!recovering || isAfterLastUpdateLocation(location)) {
+        withTx { tx =>
+          // index the update now.
+            index(tx, update, location)
+            updateLocations(tx, location)
+        }
       }
+
       if (onComplete != null) {
         onComplete.run
       }
     } else {
+
+      // only the commit/rollback in batch can have an onCompelte handler
+      assert(onComplete == null)
+
       // if the update was part of a batch don't apply till the batch is committed.
       batches.get(batch) match {
-        case Some(updates)=> updates += Update(update, location)
+        case Some((_, updates)) =>
+          updates += Update(update, location)
         case None =>
+          // when recovering..  we are more lax due recovery starting
+          // in the middle of a stream of in progress batches
+          assert(!recovering)
       }
     }
   }
 
-  private def begin(batch: Int, location: Location): Unit = {
-    assert( batches.get(batch).isEmpty )
-    batches.put(batch, ListBuffer())
+
+  private def index(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+
+    object Process extends TxHelper(tx) {
+      import JavaConversions._
+
+      def apply(x: AddMessage.Getter): Unit = {
+
+        val key = x.getMessageKey()
+        if (key > databaseRootRecord.getLastMessageKey) {
+          databaseRootRecord.setLastMessageKey(key)
+        }
+
+        val prevLocation = messageKeyIndex.put(key, location)
+        if (prevLocation != null) {
+          // Message existed.. undo the index update we just did. Chances
+          // are it's a transaction replay.
+          messageKeyIndex.put(key, prevLocation)
+          if (location == prevLocation) {
+            warn("Message replay detected for: %d", key)
+          } else {
+            error("Message replay with different location for: %d", key)
+          }
+        } else {
+          val fileId:java.lang.Integer = location.getDataFileId()
+          addAndGet(dataFileRefIndex, fileId, 1)
+        }
+      }
+
+      def removeMessage(key:Long) = {
+        val location = messageKeyIndex.remove(key)
+        if (location != null) {
+          val fileId:java.lang.Integer = location.getDataFileId()
+          addAndGet(dataFileRefIndex, fileId, -1)
+        } else {
+          error("Cannot remove message, it did not exist: %d", key)
+        }
+      }
+
+      def apply(x: AddQueue.Getter): Unit = {
+        if (queueIndex.get(x.getKey) == null) {
+          val queueRecord = new QueueRootRecord.Bean
+          queueRecord.setEntryIndexPage(alloc(QUEUE_ENTRY_INDEX_FACTORY))
+          queueRecord.setTrackingIndexPage(alloc(QUEUE_TRACKING_INDEX_FACTORY))
+          queueRecord.setInfo(x)
+          queueIndex.put(x.getKey, queueRecord.freeze)
+        }
+      }
+
+      def apply(x: RemoveQueue.Getter): Unit = {
+        val queueRecord = queueIndex.remove(x.getKey)
+        if (queueRecord != null) {
+          queueEntryIndex(queueRecord).destroy
+          queueTrackingIndex(queueRecord).destroy
+        }
+      }
+
+      def apply(x: AddQueueEntry.Getter): Unit = {
+        val queueKey = x.getQueueKey
+        val queueRecord = queueIndex.get(queueKey)
+        if (queueRecord != null) {
+          val trackingIndex = queueTrackingIndex(queueRecord)
+          val entryIndex = queueEntryIndex(queueRecord)
+
+          // a message can only appear once in a queue (for now).. perhaps we should
+          // relax this constraint.
+          val messageKey = x.getMessageKey
+          val queueSeq = x.getQueueSeq
+
+          val existing = trackingIndex.put(messageKey, queueSeq)
+          if (existing == null) {
+            val previous = entryIndex.put(queueSeq, x.freeze)
+            if (previous == null) {
+
+              val queueRecordUpdate = queueRecord.copy
+              queueRecordUpdate.setCount(queueRecord.getCount + 1)
+              queueRecordUpdate.setSize(queueRecord.getSize + x.getSize)
+              queueIndex.put(queueKey, queueRecordUpdate.freeze)
+
+              addAndGet(messageRefsIndex, new java.lang.Long(messageKey), 1)
+            } else {
+              error("Duplicate queue entry seq %d", x.getQueueSeq)
+            }
+          } else {
+            error("Duplicate queue entry message %d", x.getMessageKey)
+          }
+        } else {
+          error("Queue not found: %d", x.getQueueKey)
+        }
+      }
+
+      def apply(x: RemoveQueueEntry.Getter): Unit = {
+        val queueKey = x.getQueueKey
+        val queueRecord = queueIndex.get(queueKey)
+        if (queueRecord != null) {
+          val trackingIndex = queueTrackingIndex(queueRecord)
+          val entryIndex = queueEntryIndex(queueRecord)
+
+          val queueSeq = x.getQueueSeq
+          val queueEntry = entryIndex.remove(queueSeq)
+          if (queueEntry != null) {
+            val messageKey = queueEntry.getMessageKey
+            val existing = trackingIndex.remove(messageKey)
+            if (existing == null) {
+              error("Tracking entry not found for message %d", queueEntry.getMessageKey)
+            }
+            if( addAndGet(messageRefsIndex, new java.lang.Long(messageKey), -1) == 0 ) {
+              // message is no long referenced.. we can remove it..
+              removeMessage(messageKey)
+            }
+          } else {
+            error("Queue entry not found for seq %d", x.getQueueSeq)
+          }
+        } else {
+          error("Queue not found: %d", x.getQueueKey)
+        }
+      }
+
+      def apply(x: Purge.Getter): Unit = {
+
+        // Remove all the queues...
+        queueIndex.iterator.map {
+          entry =>
+            entry.getKey
+        }.foreach {
+          key =>
+            apply(new RemoveQueue.Bean().setKey(key.intValue))
+        }
+
+        // Remove stored messages...
+        messageKeyIndex.clear
+        messageRefsIndex.clear
+        dataFileRefIndex.clear
+        databaseRootRecord.setLastMessageKey(0)
+
+        cleanup(tx);
+        info("Store purged.");
+      }
+
+      def apply(x: AddTrace.Getter): Unit = {
+        // trace messages are informational messages in the journal used to log
+        // historical info about store state.  They don't update the indexes.
+      }
+    }
+
+    update match {
+      case x: AddMessage.Getter => Process(x)
+      case x: AddQueueEntry.Getter => Process(x)
+      case x: RemoveQueueEntry.Getter => Process(x)
+
+      case x: AddQueue.Getter => Process(x)
+      case x: RemoveQueue.Getter => Process(x)
+
+      case x: AddTrace.Getter => Process(x)
+      case x: Purge.Getter => Process(x)
+
+      case x: AddSubscription.Getter =>
+      case x: RemoveSubscription.Getter =>
+
+      case x: AddMap.Getter =>
+      case x: RemoveMap.Getter =>
+      case x: PutMapEntry.Getter =>
+      case x: RemoveMapEntry.Getter =>
+
+      case x: OpenStream.Getter =>
+      case x: WriteStream.Getter =>
+      case x: CloseStream.Getter =>
+      case x: RemoveStream.Getter =>
+    }
   }
 
-  private def commit(batch: Int, onComplete: Runnable, location: Location): Unit = {
-    // apply all the updates in the batch as a single unit of work.
-    withTx {tx =>
-      batches.get(batch) match {
-        case Some(updates) =>
-          updates.foreach {update =>
-            store(tx, update.update, update.location)
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Periodic Maintance
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def schedualFlush(): Unit = {
+    def try_flush() = {
+      if (hawtDBStore.serviceState.isStarted) {
+        hawtDBStore.executor_pool {
+          flush
+          schedualFlush
+        }
+      }
+    }
+    dispatchQueue.dispatchAfter(config.indexFlushInterval, TimeUnit.MILLISECONDS, ^ {try_flush})
+  }
+
+  def flush() = {
+    val start = System.currentTimeMillis()
+    pageFile.flush
+    val end = System.currentTimeMillis()
+    if (end - start > 1000) {
+      warn("Index flush took %,.3f seconds" + ((end - start) / 1000.0f))
+    }
+  }
+
+  def schedualCleanup(): Unit = {
+    def try_cleanup() = {
+      if (hawtDBStore.serviceState.isStarted) {
+        hawtDBStore.executor_pool {
+          withTx {tx =>
+            cleanup(tx)
           }
-          if (onComplete != null) {
-            onComplete.run
+          schedualCleanup
+        }
+      }
+    }
+    dispatchQueue.dispatchAfter(config.cleanupInterval, TimeUnit.MILLISECONDS, ^ {try_cleanup})
+  }
+
+  /**
+   * @param tx
+   * @throws IOException
+   */
+  def cleanup(tx:Transaction) = {
+    val helper = new TxHelper(tx)
+    import JavaConversions._
+    import helper._
+
+    debug("Cleanup started.")
+    val gcCandidateSet = new TreeSet[Integer](journal.getFileMap().keySet())
+
+    // Don't cleanup locked data files
+    if (lockedDatatFiles != null) {
+      gcCandidateSet.removeAll(lockedDatatFiles)
+    }
+
+    // Don't GC files that we will need for recovery..
+    val upto = if (databaseRootRecord.hasFirstInProgressBatch) {
+      Some(databaseRootRecord.getFirstInProgressBatch.getDataFileId)
+    } else {
+      if (databaseRootRecord.hasLastUpdateLocation) {
+        Some(databaseRootRecord.getLastUpdateLocation.getDataFileId)
+      } else {
+        None
+      }
+    }
+
+    upto match {
+      case Some(dataFile) =>
+        var done = false
+        while (!done && !gcCandidateSet.isEmpty()) {
+          val last = gcCandidateSet.last()
+          if (last.intValue >= dataFile) {
+            gcCandidateSet.remove(last)
+          } else {
+            done = true
           }
-        case None =>
+        }
+
+      case None =>
+    }
+
+    if (!gcCandidateSet.isEmpty() ) {
+      dataFileRefIndex.iterator.foreach { entry =>
+        gcCandidateSet.remove(entry.getKey)
+      }
+      if (!gcCandidateSet.isEmpty()) {
+        debug("Cleanup removing the data files: %s", gcCandidateSet)
+        journal.removeDataFiles(gcCandidateSet)
+      }
+    }
+    debug("Cleanup done.")
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Helper Methods / Classes
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  private case class Update(update: TypeCreatable, location: Location)
+
+  private class TxHelper(private val _tx: Transaction) {
+    lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+    lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, databaseRootRecord.getDataFileRefIndexPage)
+    lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageKeyIndexPage)
+    lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, databaseRootRecord.getMessageRefsIndexPage)
+    lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, databaseRootRecord.getSubscriptionIndexPage)
+
+    def addAndGet[K](index:SortedIndex[K, Integer], key:K, amount:Int):Int = {
+      var counter = index.get(key)
+      if( counter == null ) {
+        if( amount!=0 ) {
+          index.put(key, amount)
+        }
+        amount
+      } else {
+        val update = counter.intValue + amount
+        if( update == 0 ) {
+          index.remove(key)
+        } else {
+          index.put(key, update)
+        }
+        update
       }
     }
+
+    def queueEntryIndex(root: QueueRootRecord.Getter) = QUEUE_ENTRY_INDEX_FACTORY.open(_tx, root.getEntryIndexPage)
+
+    def queueTrackingIndex(root: QueueRootRecord.Getter) = QUEUE_TRACKING_INDEX_FACTORY.open(_tx, root.getTrackingIndexPage)
+
+    def alloc(factory: IndexFactory[_, _]) = {
+      val rc = _tx.alloc
+      factory.create(_tx, rc)
+      rc
+    }
   }
 
-  private def store(tx: Transaction, update: TypeCreatable, location: Location): Unit = {
+  private def withTx[T](func: (Transaction) => T): T = {
+    val tx = pageFile.tx
+    var ok = false
+    try {
+      val rc = func(tx)
+      ok = true
+      rc
+    } finally {
+      if (ok) {
+        tx.commit
+      } else {
+        tx.rollback
+      }
+    }
+  }
 
+  // Gets the next batch id.. after a while we may wrap around
+  // start producing batch ids from zero
+  val next_batch_id = {
+    var rc = next_batch_counter.getAndIncrement
+    while (rc < 0) {
+      // We just wrapped around.. reset the counter to 0
+      // Use a CAS operation so that only 1 thread resets the counter
+      next_batch_counter.compareAndSet(rc + 1, 0)
+      rc = next_batch_counter.getAndIncrement
+    }
+    rc
   }
 
+  private def isAfterLastUpdateLocation(location: Location) = {
+    val lastUpdate: Location = databaseRootRecord.getLastUpdateLocation
+    lastUpdate.compareTo(location) < 0
+  }
 
+  private def updateLocations(tx: Transaction, location: Location): Unit = {
+    databaseRootRecord.setLastUpdateLocation(location)
+    if (batches.isEmpty) {
+      databaseRootRecord.clearFirstInProgressBatch
+    } else {
+      databaseRootRecord.setFirstInProgressBatch(batches.head._2._1)
+    }
+    tx.put(DATABASE_ROOT_RECORD_ACCESSOR, 0, databaseRootRecord.freeze)
+    databaseRootRecord = databaseRootRecord.copy
+  }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:06:47 2010
@@ -74,9 +74,9 @@ class HawtDBStore extends Store with Bas
   var next_queue_key = new AtomicLong(0)
   var next_msg_key = new AtomicLong(0)
 
-  protected var executor_pool:ExecutorService = _
+  var executor_pool:ExecutorService = _
   var config:HawtDBStoreDTO = defaultConfig
-  val client = new HawtDBClient
+  val client = new HawtDBClient(this)
 
   def configure(config: StoreDTO, reporter: Reporter) = configure(config.asInstanceOf[HawtDBStoreDTO], reporter)
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul  7 04:06:47 2010
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb
+
+import model._
+import model.Type.TypeCreatable
+import org.apache.activemq.apollo.store.{MessageRecord, QueueRecord, QueueEntryRecord}
+import org.fusesource.hawtbuf.codec._
+import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
+import java.io.{IOException, DataInput, DataOutput}
+import org.fusesource.hawtdb.internal.journal.{LocationCodec, Location}
+import org.fusesource.hawtdb.api._
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessage}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object Helpers {
+
+  val QUEUE_RECORD_CODEC = new VariableCodec[QueueEntryRecord]() {
+    def decode(dataIn: DataInput): QueueEntryRecord = {
+      val rc = new QueueEntryRecord();
+      rc.queueKey = dataIn.readLong();
+      rc.messageKey = dataIn.readLong();
+      rc.size = dataIn.readInt();
+      //            if (dataIn.readBoolean()) {
+      //                rc.setTte(dataIn.readLong());
+      //            }
+      rc.redeliveries = dataIn.readShort();
+      if (dataIn.readBoolean()) {
+        rc.attachment = BUFFER_CODEC.decode(dataIn);
+      }
+      return rc;
+    }
+
+    def encode(value: QueueEntryRecord, dataOut: DataOutput) = {
+      dataOut.writeLong(value.queueKey);
+      dataOut.writeLong(value.messageKey);
+      dataOut.writeInt(value.size);
+      //            if (value.getTte() >= 0) {
+      //                dataOut.writeBoolean(true);
+      //                dataOut.writeLong(value.getTte());
+      //            } else {
+      //                dataOut.writeBoolean(false);
+      //            }
+      dataOut.writeShort(value.redeliveries);
+      if (value.attachment != null) {
+        dataOut.writeBoolean(true);
+        BUFFER_CODEC.encode(value.attachment, dataOut);
+      } else {
+        dataOut.writeBoolean(false);
+      }
+    }
+
+    def estimatedSize(value: QueueEntryRecord) = throw new UnsupportedOperationException()
+  }
+
+  val QUEUE_DESCRIPTOR_CODEC = new VariableCodec[QueueRecord]() {
+    def decode(dataIn: DataInput): QueueRecord = {
+      val record = new QueueRecord();
+      record.queueType = ASCII_BUFFER_CODEC.decode(dataIn);
+      record.name = ASCII_BUFFER_CODEC.decode(dataIn);
+      //            if (dataIn.readBoolean()) {
+      //                record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn)
+      //                record.setPartitionId(dataIn.readInt());
+      //            }
+      return record;
+    }
+
+    def encode(value: QueueRecord, dataOut: DataOutput) = {
+      ASCII_BUFFER_CODEC.encode(value.queueType, dataOut);
+      ASCII_BUFFER_CODEC.encode(value.name, dataOut);
+      //            if (value.parent != null) {
+      //                dataOut.writeBoolean(true);
+      //                ASCII_BUFFER_MARSHALLER.writePayload(value.parent, dataOut);
+      //                dataOut.writeInt(value.getPartitionKey());
+      //            } else {
+      //                dataOut.writeBoolean(false);
+      //            }
+    }
+
+    def estimatedSize(value: QueueRecord) = throw new UnsupportedOperationException()
+  };
+
+  val ASCII_BUFFER_CODEC = AsciiBufferCodec.INSTANCE;
+  val BUFFER_CODEC = BufferCodec.INSTANCE;
+
+
+  implicit def toMessageRecord(pb: AddMessage.Getter): MessageRecord = {
+    val rc = new MessageRecord
+    rc.protocol = pb.getProtocol
+    rc.size = pb.getSize
+    rc.value = pb.getValue
+    rc.stream = pb.getStreamKey
+    rc.expiration = pb.getExpiration
+    rc
+  }
+
+  implicit def fromMessageRecord(v: MessageRecord): AddMessage.Bean = {
+    val pb = new AddMessage.Bean
+    pb.setProtocol(v.protocol)
+    pb.setSize(v.size)
+    pb.setValue(v.value)
+    pb.setStreamKey(v.stream)
+    pb.setExpiration(v.expiration)
+    pb
+  }
+
+  implicit def toQueueEntryRecord(pb: AddQueueEntry.Getter): QueueEntryRecord = {
+    val rc = new QueueEntryRecord
+    rc.queueKey = pb.getQueueKey
+    rc.queueSeq = pb.getQueueSeq
+    rc.messageKey = pb.getMessageKey
+    rc.attachment = pb.getAttachment
+    rc.size = pb.getSize
+    rc.redeliveries = pb.getRedeliveries.toShort
+    rc
+  }
+
+  implicit def fromQueueEntryRecord(v: QueueEntryRecord): AddQueueEntry.Bean = {
+    val pb = new AddQueueEntry.Bean
+    pb.setQueueKey(v.queueKey)
+    pb.setQueueSeq(v.queueSeq)
+    pb.setMessageKey(v.messageKey)
+    pb.setAttachment(v.attachment)
+    pb.setSize(v.size)
+    pb.setRedeliveries(v.redeliveries)
+    pb
+  }
+
+  implicit def toLocation(value: Long): Location = {
+    val temp = new Buffer(8)
+    val editor = temp.bigEndianEditor
+    editor.writeLong(value)
+    temp.reset
+    new Location(editor.readInt(), editor.readInt())
+  }
+  
+  implicit def fromLocation(value: Location):Long = {
+    val temp = new Buffer(8)
+    val editor = temp.bigEndianEditor
+    editor.writeInt(value.getDataFileId)
+    editor.writeInt(value.getOffset)
+    temp.reset
+    editor.readLong
+  }
+
+  implicit def toAsciiBuffer(value:String):AsciiBuffer = new AsciiBuffer(value)
+  implicit def toUTF8Buffer(value:String):UTF8Buffer = new UTF8Buffer(value)
+
+  type PB = PBMessage[_ <: PBMessage[_, _], _ <: MessageBuffer[_, _]]
+  implicit def toPBMessage(value: TypeCreatable): PB = value.asInstanceOf[PB]
+
+
+  val DATABASE_ROOT_RECORD_ACCESSOR = new CodecPagedAccessor[DatabaseRootRecord.Buffer](DatabaseRootRecord.FRAMED_CODEC);
+
+  def decode(location: Location, updateType: Int, value: Buffer) = {
+    val t = Type.valueOf(updateType);
+    if (t == null) {
+      throw new IOException("Could not load journal record. Invalid type at location: " + location);
+    }
+    t.parseUnframed(value).asInstanceOf[TypeCreatable]
+  }
+
+  //
+  // Index factories...
+  //
+
+  import java.{lang => jl}
+
+  // maps message key -> Journal Location
+  val MESSAGE_KEY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, Location]();
+  MESSAGE_KEY_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+  MESSAGE_KEY_INDEX_FACTORY.setValueCodec(LocationCodec.INSTANCE);
+  MESSAGE_KEY_INDEX_FACTORY.setDeferredEncoding(true);
+
+  // maps Journal Data File Id -> Ref Counter
+  val DATA_FILE_REF_INDEX_FACTORY = new BTreeIndexFactory[jl.Integer, jl.Integer]();
+  DATA_FILE_REF_INDEX_FACTORY.setKeyCodec(VarIntegerCodec.INSTANCE);
+  DATA_FILE_REF_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE);
+  DATA_FILE_REF_INDEX_FACTORY.setDeferredEncoding(true);
+
+  // maps message key -> Ref Counter
+  val MESSAGE_REFS_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Integer]();
+  MESSAGE_REFS_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+  MESSAGE_REFS_INDEX_FACTORY.setValueCodec(VarIntegerCodec.INSTANCE);
+  MESSAGE_REFS_INDEX_FACTORY.setDeferredEncoding(true);
+
+  // maps queue key -> QueueRootRecord
+  val QUEUE_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, QueueRootRecord.Buffer]();
+  QUEUE_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);
+  QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC);
+  QUEUE_INDEX_FACTORY.setDeferredEncoding(true);
+
+  // maps queue seq -> AddQueueEntry
+  val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddQueueEntry.Buffer]();
+  QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);
+  QUEUE_ENTRY_INDEX_FACTORY.setValueCodec(AddQueueEntry.FRAMED_CODEC);
+  QUEUE_ENTRY_INDEX_FACTORY.setDeferredEncoding(true);
+
+  // maps message key -> queue seq
+  val QUEUE_TRACKING_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, jl.Long]();
+  QUEUE_TRACKING_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+  QUEUE_TRACKING_INDEX_FACTORY.setValueCodec(VarLongCodec.INSTANCE);
+  QUEUE_TRACKING_INDEX_FACTORY.setDeferredEncoding(true);
+
+  val SUBSCRIPTIONS_INDEX_FACTORY = new BTreeIndexFactory[AsciiBuffer, AddSubscription.Buffer]();
+  SUBSCRIPTIONS_INDEX_FACTORY.setKeyCodec(AsciiBufferCodec.INSTANCE);
+  SUBSCRIPTIONS_INDEX_FACTORY.setValueCodec(AddSubscription.FRAMED_CODEC);
+  SUBSCRIPTIONS_INDEX_FACTORY.setDeferredEncoding(true);
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala?rev=961128&r1=961127&r2=961128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.scala Wed Jul  7 04:06:47 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
-import model.RootRecord
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 import org.fusesource.hawtdb.api._
 import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory, PBMessage}
@@ -37,37 +36,7 @@ class DestinationEntity {
 //}
 
 object RootEntity {
-//  val messageKeyIndexFactory = new BTreeIndexFactory[Long, Long]();
-//  val locationIndexFactory = new BTreeIndexFactory[Integer, Long]();
-//  val messageRefsIndexFactory = new BTreeIndexFactory[Long, Long]();
-//  val destinationIndexFactory = new BTreeIndexFactory[Long, DestinationEntity]();
-//  val subscriptionIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
-//  val mapIndexFactory = new BTreeIndexFactory[AsciiBuffer, Integer]();
-//  val mapInstanceIndexFactory = new BTreeIndexFactory[AsciiBuffer, Buffer]();
-//
-//  messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-//  messageKeyIndexFactory.setValueCodec(LongCodec.INSTANCE);
-//  messageKeyIndexFactory.setDeferredEncoding(true);
-//
-//  locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
-//  locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
-//  locationIndexFactory.setDeferredEncoding(true);
-//
-//  messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-//  messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
-//  messageRefsIndexFactory.setDeferredEncoding(true);
-//
-//  destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
-//  destinationIndexFactory.setValueCodec(DestinationEntity.MARSHALLER);
-//  destinationIndexFactory.setDeferredEncoding(true);
-//
-//  subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
-//  subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
-//  subscriptionIndexFactory.setDeferredEncoding(true);
-//
-//  mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
-//  mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
-//  mapIndexFactory.setDeferredEncoding(true);
+
 //
 //  val DATA_ENCODER_DECODER = PBEncoderDecoder(RootRecord.FACTORY)
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java?rev=961128&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBManagerBenchmark.java Wed Jul  7 04:06:47 2010
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.hawtdb;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.apollo.store.MessageRecord;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueStatus;
+import org.apache.activemq.apollo.store.QueueEntryRecord;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HawtDBManagerBenchmark extends TestCase {
+
+//    private static int PERFORMANCE_SAMPLES = 50;
+//    private static boolean SYNC_TO_DISK = true;
+//    private static final boolean USE_SHARED_WRITER = true;
+//
+//    private HawtDBManager store;
+//    private QueueRecord queueId;
+//    private AtomicLong queueKey = new AtomicLong(0);
+//
+//    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+//    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+//
+//    protected ArrayList<Consumer> consumers = new ArrayList<Consumer>();
+//    protected ArrayList<Producer> producers = new ArrayList<Producer>();
+//
+//    protected HawtDBManager createStore() {
+//        HawtDBManager rc = new HawtDBManager();
+//        rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
+//        rc.setDeleteAllMessages(true);
+//        return rc;
+//    }
+//
+//    private SharedWriter writer = null;
+//
+//    private Semaphore enqueuePermits;
+//    private Semaphore dequeuePermits;
+//
+//    @Override
+//    protected void setUp() throws Exception {
+//        store = createStore();
+//        //store.setDeleteAllMessages(false);
+//        store.start();
+//
+//        if (USE_SHARED_WRITER) {
+//            writer = new SharedWriter();
+//            writer.start();
+//        }
+//
+//        enqueuePermits = new Semaphore(20000000);
+//        dequeuePermits = new Semaphore(0);
+//
+//        queueId = new QueueRecord();
+//        queueId.name = new AsciiBuffer("test");
+//        store.execute(new VoidCallback<Exception>() {
+//            @Override
+//            public void run(HawtDBSession session) throws Exception {
+//                session.queueAdd(queueId);
+//            }
+//        }, null);
+//
+//        store.execute(new VoidCallback<Exception>() {
+//            @Override
+//            public void run(HawtDBSession session) throws Exception {
+//                Iterator<QueueStatus> qqrs = session.queueList(queueId, 1);
+//                Assert.assertTrue(qqrs.hasNext());
+//                QueueStatus qqr = qqrs.next();
+//                if(qqr.size > 0)
+//                {
+//                    queueKey.set(qqr.last + 1);
+//                    System.out.println("Recovered queue: " + qqr.record.name + " with " + qqr.count + " messages");
+//                }
+//            }
+//        }, null);
+//    }
+//
+//    @Override
+//    protected void tearDown() throws Exception {
+//        for (Consumer c : consumers) {
+//            c.stop();
+//        }
+//        consumers.clear();
+//        for (Producer p : producers) {
+//            p.stop();
+//        }
+//        producers.clear();
+//
+//        if (writer != null) {
+//            writer.stop();
+//        }
+//
+//        if (store != null) {
+//            store.stop();
+//        }
+//    }
+//
+//    class SharedWriter implements Runnable {
+//        LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
+//        private Thread thread;
+//        private AtomicBoolean stopped = new AtomicBoolean();
+//
+//        public void start() {
+//            thread = new Thread(this, "Writer");
+//            thread.start();
+//        }
+//
+//        public void stop() throws InterruptedException {
+//            stopped.set(true);
+//
+//            //Add an op to trigger shutdown:
+//            SharedQueueOp op = new SharedQueueOp() {
+//                public void run() {
+//                }
+//            };
+//            op.op = new VoidCallback<Exception>() {
+//
+//                @Override
+//                public void run(HawtDBSession session) throws Exception {
+//                    // TODO Auto-generated method stub
+//                }
+//            };
+//
+//            queue.put(op);
+//            thread.join();
+//        }
+//
+//        public void run() {
+//            HawtDBSession session = store.getSession();
+//            try {
+//                LinkedList<Runnable> processed = new LinkedList<Runnable>();
+//                while (!stopped.get()) {
+//                    SharedQueueOp op = queue.take();
+//                    session.acquireLock();
+//                    int ops = 0;
+//                    while (op != null && ops < 1000) {
+//                        op.op.execute(session);
+//                        processed.add(op);
+//                        op = queue.poll();
+//                        ops++;
+//                    }
+//
+//                    session.commit();
+//                    session.releaseLock();
+//
+//                    if (SYNC_TO_DISK) {
+//                        store.flush();
+//                    }
+//
+//                    for (Runnable r : processed) {
+//                        r.run();
+//                    }
+//                    processed.clear();
+//                }
+//
+//            } catch (InterruptedException e) {
+//                if (!stopped.get()) {
+//                    e.printStackTrace();
+//                }
+//                return;
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//                return;
+//            }
+//        }
+//
+//        public void addOp(SharedQueueOp op) throws InterruptedException {
+//            queue.put(op);
+//        }
+//    }
+//
+//    abstract class SharedQueueOp implements Runnable {
+//        VoidCallback<Exception> op;
+//    }
+//
+//    class Producer implements Runnable {
+//        private Thread thread;
+//        private AtomicBoolean stopped = new AtomicBoolean();
+//        private String name;
+//        protected final MetricCounter rate = new MetricCounter();
+//        private long sleep;
+//
+//        public Producer(String name) {
+//            this.name = name;
+//        }
+//
+//        public void start() {
+//            rate.name("Producer " + name + " Rate");
+//            totalProducerRate.add(rate);
+//            thread = new Thread(this, "Producer" + name);
+//            thread.start();
+//        }
+//
+//        public void stop() throws InterruptedException {
+//            stopped.set(true);
+//            while (enqueuePermits.hasQueuedThreads()) {
+//                enqueuePermits.release();
+//            }
+//            thread.join();
+//        }
+//
+//        public void run() {
+//            try {
+//                Buffer buffer = new Buffer(new byte[1024]);
+//                for (long i = 0; !stopped.get(); i++) {
+//
+//                    enqueuePermits.acquire();
+//
+//                    final MessageRecord messageRecord = new MessageRecord();
+//                    messageRecord.key = store.allocateStoreTracking();
+//                    messageRecord.protocol = new AsciiBuffer("encoding");
+//                    messageRecord.value = buffer;
+//                    messageRecord.size = buffer.getLength();
+//
+//                    SharedQueueOp op = new SharedQueueOp() {
+//                        public void run() {
+//                            rate.increment();
+//                        }
+//                    };
+//
+//                    op.op = new VoidCallback<Exception>() {
+//                        @Override
+//                        public void run(HawtDBSession session) throws Exception {
+//                            session.messageAdd(messageRecord);
+//                            QueueEntryRecord queueEntryRecord = new QueueEntryRecord();
+//                            queueEntryRecord.messageKey = messageRecord.key;
+//                            queueEntryRecord.queueKey = queueKey.incrementAndGet();
+//                            queueEntryRecord.size = messageRecord.size;
+//                            session.queueAddMessage(queueId, queueEntryRecord);
+//                            dequeuePermits.release();
+//                        }
+//                    };
+//
+//                    if (!USE_SHARED_WRITER) {
+//                        store.execute(op.op, op);
+//
+//                        if (SYNC_TO_DISK) {
+//                            store.flush();
+//                        }
+//
+//                    } else {
+//                        writer.addOp(op);
+//                    }
+//
+//                    if (sleep > 0) {
+//                        Thread.sleep(sleep);
+//                    }
+//                }
+//            } catch (InterruptedException e) {
+//                if (!stopped.get()) {
+//                    e.printStackTrace();
+//                }
+//                return;
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    class Consumer implements Runnable {
+//        private Thread thread;
+//        private AtomicBoolean stopped = new AtomicBoolean();
+//        protected final MetricCounter rate = new MetricCounter();
+//        private String name;
+//        private final Semaphore queryWait = new Semaphore(0);
+//
+//        public Consumer(String name) {
+//            this.name = name;
+//        }
+//
+//        public void start() {
+//            rate.name("Consumer " + name + " Rate");
+//            totalConsumerRate.add(rate);
+//            thread = new Thread(this, "Consumer " + name);
+//            thread.start();
+//        }
+//
+//        public void stop() throws InterruptedException {
+//            stopped.set(true);
+//            queryWait.release();
+//            thread.join();
+//        }
+//
+//        public void run() {
+//            try {
+//                while (!stopped.get()) {
+//                    final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);
+//                    SharedQueueOp op = new SharedQueueOp() {
+//                        public void run() {
+//                            rate.increment(records.size());
+//                            enqueuePermits.release(records.size());
+//                            queryWait.release();
+//                        }
+//                    };
+//
+//                    op.op = new VoidCallback<Exception>() {
+//                        @Override
+//                        public void run(HawtDBSession session) throws Exception {
+//                            Iterator<QueueEntryRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
+//                            for (Iterator<QueueEntryRecord> iterator = queueRecords; iterator.hasNext();) {
+//                                QueueEntryRecord r = iterator.next();
+//                                records.add(session.messageGetRecord(r.messageKey));
+//                                session.queueRemoveMessage(queueId, r.queueKey);
+//                            }
+//                        }
+//                    };
+//
+//                    if (!USE_SHARED_WRITER) {
+//                        store.execute(op.op, op);
+//                        if (SYNC_TO_DISK) {
+//                            store.flush();
+//                        }
+//                    } else {
+//                        writer.addOp(op);
+//                    }
+//
+//                    dequeuePermits.acquire();
+//                    records.clear();
+//                }
+//            } catch (InterruptedException e) {
+//                if (!stopped.get()) {
+//                    e.printStackTrace();
+//                }
+//                return;
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    public void test1_1_0() throws Exception {
+//        startProducers(1);
+//        reportRates();
+//    }
+//
+//
+//    public void test1_1_1() throws Exception {
+//        startProducers(1);
+//        startConsumers(1);
+//        reportRates();
+//    }
+//
+//    public void test10_1_1() throws Exception {
+//        startProducers(10);
+//        startConsumers(1);
+//        reportRates();
+//    }
+//
+//    private void startProducers(int count) {
+//        for (int i = 0; i < count; i++) {
+//            Producer p = new Producer("" + (i + 1));
+//            producers.add(p);
+//            p.start();
+//        }
+//    }
+//
+//    private void startConsumers(int count) {
+//        for (int i = 0; i < count; i++) {
+//            Consumer c = new Consumer("" + (i + 1));
+//            consumers.add(c);
+//            c.start();
+//        }
+//    }
+//
+//    private void reportRates() throws InterruptedException {
+//        System.out.println("Checking rates for test: " + getName());
+//        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+//            Period p = new Period();
+//            Thread.sleep(1000 * 5);
+//            System.out.println(totalProducerRate.getRateSummary(p));
+//            System.out.println(totalConsumerRate.getRateSummary(p));
+//            totalProducerRate.reset();
+//            totalConsumerRate.reset();
+//        }
+//    }
+
+}
\ No newline at end of file