You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:17:36 UTC

svn commit: r961197 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/proto/ a...

Author: chirino
Date: Wed Jul  7 04:17:35 2010
New Revision: 961197

URL: http://svn.apache.org/viewvc?rev=961197&view=rev
Log:
Adding a new DirectRecordStore to allow the protocols to stream large message out of band (from the point of view of the JVM gc).

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Jul  7 04:17:35 2010
@@ -50,7 +50,7 @@ class MultiProtocolHandler extends Proto
   override def onTransportCommand(command:Any) = {
 
     if (!command.isInstanceOf[WireFormat]) {
-      throw new ProtocolException("First command should be a WireFormat");
+      throw new ProtocolException("Expected WireFormat");
     }
 
     var wireformat:WireFormat = command.asInstanceOf[WireFormat];
@@ -67,6 +67,8 @@ class MultiProtocolHandler extends Proto
     // replace the current handler with the new one.
     connection.protocol = protocol
     connection.protocolHandler = protocolHandler
+    connection.transport.setWireformat(wireformat)
+    
     connection.transport.suspendRead
     protocolHandler.onTransportConnected
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:17:35 2010
@@ -62,7 +62,7 @@ class CassandraClient() {
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
     rc.value = pb.getValue
-    rc.stream = pb.getStream
+    rc.directKey = pb.getStream
     rc.expiration = pb.getExpiration
     rc
   }
@@ -72,7 +72,7 @@ class CassandraClient() {
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.value)
-    pb.setStream(v.stream)
+    pb.setStream(v.directKey)
     pb.setExpiration(v.expiration)
     pb.freeze.toUnframedByteArray
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul  7 04:17:35 2010
@@ -34,10 +34,8 @@ enum Type {
   PUT_MAP_ENTRY = 32;
   REMOVE_MAP_ENTRY = 33;
 
-  OPEN_STREAM = 40;
-  WRITE_STREAM = 41;
-  CLOSE_STREAM = 42;
-  REMOVE_STREAM = 43;
+  ADD_DIRECT = 40;
+  REMOVE_DIRECT = 41;
 
   ADD_SUBSCRIPTION = 50;
   REMOVE_SUBSCRIPTION = 51;
@@ -134,20 +132,16 @@ message RemoveMapEntry {
 }
 
 ///////////////////////////////////////////////////////////////
-// Stream related operations.
+// Direct buffer related operations.
 ///////////////////////////////////////////////////////////////
-message OpenStream {
-  required int64 streamKey=1;
+message AddDirect {
+  required int64 directKey=1;
+  required int32 size=2;
+  required int32 page=3;
 }
-message WriteStream {
-  required int64 streamKey=1;
-  optional bytes data = 2;
-}
-message CloseStream {
-  required int64 streamKey=1;
-}
-message RemoveStream {
-  required int64 streamKey=1;
+
+message RemoveDirect {
+  required int64 directKey=1;
 }
 
 
@@ -167,7 +161,8 @@ message DatabaseRootRecord {
   optional fixed32 queueIndexPage=53;
   optional fixed32 subscriptionIndexPage=54;
   optional fixed32 mapIndexPage=55;
-  
+  optional fixed32 directIndexPage=56;
+
 }
 
 message QueueRootRecord {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul  7 04:17:35 2010
@@ -23,15 +23,9 @@ import model.{AddQueue, AddQueueEntry, A
 import org.apache.activemq.apollo.dto.HawtDBStoreDTO
 import java.io.File
 import java.io.IOException
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtbuf.proto.MessageBuffer
-import org.fusesource.hawtbuf.proto.PBMessage
 import org.apache.activemq.util.LockFile
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location}
-import org.fusesource.hawtdispatch.TaskTracker
-
-import org.fusesource.hawtbuf.AsciiBuffer._
 import org.apache.activemq.broker.store.hawtdb.model.Type._
 import org.apache.activemq.broker.store.hawtdb.model._
 import org.fusesource.hawtbuf._
@@ -44,6 +38,10 @@ import org.fusesource.hawtdb.api._
 import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
 import org.apache.activemq.apollo.util.TimeCounter
 import org.apache.activemq.apollo.store._
+import org.fusesource.hawtdb.api.Paged.SliceType
+import java.util.concurrent.TimeUnit
+
+
 
 object HawtDBClient extends Log {
   val BEGIN = -1
@@ -69,7 +67,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   def dispatchQueue = hawtDBStore.dispatchQueue
 
 
-  private val pageFileFactory = new TxPageFileFactory()
+  private val directFileFactory = new PageFileFactory()
+  private val indexFileFactory = new TxPageFileFactory()
   private var journal: Journal = null
 
   private var lockFile: LockFile = null
@@ -107,7 +106,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   private def failIfDatabaseIsLocked = config.failIfLocked
 
-  private def pageFile = pageFileFactory.getTxPageFile()
+  private def directFile = directFileFactory.getPageFile
+
+  private def indexFile = indexFileFactory.getTxPageFile()
 
 
   /////////////////////////////////////////////////////////////////////
@@ -176,14 +177,19 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       }
       journal.start
 
-      pageFileFactory.setFile(new File(directory, "db"))
-      pageFileFactory.setDrainOnClose(false)
-      pageFileFactory.setSync(true)
-      pageFileFactory.setUseWorkerThread(true)
-      pageFileFactory.setPageSize(config.indexPageSize)
-      pageFileFactory.setCacheSize(config.indexCacheSize);
+      directFileFactory.setFile(new File(directory, "direct"));
+      directFileFactory.setHeaderSize(0);
+      directFileFactory.setPageSize(1024)
+      directFileFactory.open
+
+      indexFileFactory.setFile(new File(directory, "db"))
+      indexFileFactory.setDrainOnClose(false)
+      indexFileFactory.setSync(true)
+      indexFileFactory.setUseWorkerThread(true)
+      indexFileFactory.setPageSize(config.indexPageSize)
+      indexFileFactory.setCacheSize(config.indexCacheSize);
 
-      pageFileFactory.open()
+      indexFileFactory.open
 
       val initialized = withTx { tx =>
           if (!tx.allocator().isAllocated(0)) {
@@ -198,6 +204,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
             rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
             rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
             rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+            rootBean.setDirectIndexPage(alloc(DIRECT_INDEX_FACTORY))
             storedRootBuffer = rootBean.freeze
             helper.storeRootBean
 
@@ -210,11 +217,25 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       }
 
       if( initialized ) {
-        pageFile.flush()
+        indexFile.flush()
       }
 
       recover(onComplete)
 
+      // update the allocated free list in the direct file
+      // by unfreeing the items contained in the direct index.
+      withTx { tx =>
+        val helper = new TxHelper(tx)
+        import JavaConversions._
+        import helper._
+
+        directIndex.iterator.foreach { entry =>
+          val record = entry.getValue
+          val page_count: Int = directFile.pages(record.getSize)
+          directFile.allocator.unfree(record.getPage, page_count)
+        }
+      }
+
       // Schedule periodic jobs.. they keep executing while schedule_version remains the same.
       scheduleCleanup(schedule_version.get())
       scheduleFlush(schedule_version.get())
@@ -224,10 +245,66 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   def stop() = {
     schedule_version.incrementAndGet
     journal.close
-    pageFileFactory.close
+    indexFileFactory.close
     lockFile.unlock
   }
 
+  val last_direct_key = new AtomicLong
+
+  def createDirectRecord(size: Int)(callback: (DirectRecord) => Unit) = {
+
+    val page_count: Int = directFile.pages(size)
+    val page = directFile.allocator.alloc(page_count)
+    val buffer = directFile.slice(SliceType.READ_WRITE, page, page_count)
+    val key = last_direct_key.incrementAndGet
+
+    val pb = new AddDirect.Bean
+    pb.setDirectKey(key)
+    pb.setSize(size)
+    pb.setPage(page)
+    _store(pb, null)
+
+    val record = new DirectRecord
+    record.key = key
+    record.size = size
+    record.buffer = buffer
+    callback(record)
+  }
+
+  def openDirectRecord(key: Long)(callback: (Option[DirectRecord]) => Unit) = {
+    val result = withTx { tx =>
+      val helper = new TxHelper(tx)
+      import helper._
+      val pb:AddDirect.Getter = directIndex.get(key)
+      if( pb!=null ) {
+        val page_count: Int = directFile.pages(pb.getSize)
+        val buffer = directFile.slice(SliceType.READ, pb.getPage, page_count)
+
+        val record = new DirectRecord
+        record.key = key
+        record.size = pb.getSize
+        record.buffer = buffer
+
+        Some(record)
+      } else {
+        None
+      }
+    }
+    callback(result)
+  }
+
+
+  def closeDirectRecord(record: DirectRecord) = {
+    directFile.unslice(record.buffer)
+  }
+
+  def removeDirectRecord(key: Long)(callback: (Boolean) => Unit) = {
+    val update = new RemoveDirect.Bean
+    update.setDirectKey(key)
+    _store(update, null)
+  }
+
+
   def addQueue(record: QueueRecord, callback:Runnable) = {
     val update = new AddQueue.Bean()
     update.setKey(record.key)
@@ -290,7 +367,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
     withTx { tx =>
         val helper = new TxHelper(tx)
-        import JavaConversions._
         import helper._
 
         val queueRecord = queueIndex.get(queueKey)
@@ -319,8 +395,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         val helper = new TxHelper(tx)
         import JavaConversions._
         import helper._
-        import Predicates._
-
         val queueRecord = queueIndex.get(queueKey)
         if (queueRecord != null) {
           val entryIndex = queueEntryIndex(queueRecord)
@@ -383,7 +457,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
     val locations = withTx { tx =>
       val helper = new TxHelper(tx)
-      import JavaConversions._
       import helper._
       requests.flatMap { case (messageKey, callback)=>
         val location = metric_load_from_index.time {
@@ -412,7 +485,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     metric_load_from_index.start { end =>
       withTx { tx =>
         val helper = new TxHelper(tx)
-        import JavaConversions._
         import helper._
 
         val location = messageKeyIndex.get(messageKey)
@@ -922,10 +994,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
       case x: PutMapEntry.Getter =>
       case x: RemoveMapEntry.Getter =>
 
-      case x: OpenStream.Getter =>
-      case x: WriteStream.Getter =>
-      case x: CloseStream.Getter =>
-      case x: RemoveStream.Getter =>
+      case x: AddDirect.Getter =>
+
+        directIndex.put(x.key, x.freeze)
+
+      case x: RemoveDirect.Getter =>
+
+        val record:AddDirect.Getter = directIndex.remove(x.getDirectKey)
+        if( record!=null ) {
+          val page_count: Int = directFile.pages(record.getSize)
+          directFile.allocator.free(record.getPage, page_count)
+        }
+
     }
   }
 
@@ -950,7 +1030,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
 
   def flush() = {
     val start = System.currentTimeMillis()
-    pageFile.flush
+    indexFile.flush
     val end = System.currentTimeMillis()
     if (end - start > 1000) {
       warn("Index flush latency: %,.3f seconds", ((end - start) / 1000.0f))
@@ -1043,6 +1123,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)
     lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, rootBuffer.getMessageRefsIndexPage)
     lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, rootBuffer.getSubscriptionIndexPage)
+    lazy val directIndex = DIRECT_INDEX_FACTORY.open(_tx, rootBuffer.getDirectIndexPage)
 
     def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
       var counter = index.get(key)
@@ -1104,7 +1185,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   }
 
   private def withTx[T](func: (Transaction) => T): T = {
-    val tx = pageFile.tx
+    val tx = indexFile.tx
     var ok = false
     try {
       val rc = func(tx)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul  7 04:17:35 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.store.hawtdb
 
-import org.apache.activemq.broker.store.{StoreUOW, Store}
 import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.atomic.AtomicLong
 import collection.mutable.ListBuffer
@@ -30,6 +29,7 @@ import ReporterLevel._
 import java.util.concurrent._
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util.{IntMetricCounter, TimeCounter, IntCounter}
+import org.apache.activemq.broker.store.{DirectRecordStore, StoreUOW, Store}
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -58,7 +58,7 @@ object HawtDBStore extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBStore extends Store with BaseService with DispatchLogging {
+class HawtDBStore extends Store with DirectRecordStore with BaseService with DispatchLogging {
 
   import HawtDBStore._
   override protected def log = HawtDBStore
@@ -125,7 +125,37 @@ class HawtDBStore extends Store with Bas
 
   /////////////////////////////////////////////////////////////////////
   //
-  // Implementation of the BrokerDatabase interface
+  // Implementation of the DirectRecordStore interface
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  def createDirectRecord(size: Int)(callback: (DirectRecord) => Unit) = {
+    executor_pool {
+      client.createDirectRecord(size)(callback)
+    }
+  }
+
+  def openDirectRecord(key: Long)(callback: (Option[DirectRecord]) => Unit) = {
+    executor_pool {
+      client.openDirectRecord(key)(callback)
+    }
+  }
+
+  def closeDirectRecord(record: DirectRecord) = {
+    executor_pool {
+      client.closeDirectRecord(record)
+    }
+  }
+
+  def removeDirectRecord(key: Long)(callback: (Boolean) => Unit) = {
+    executor_pool {
+      client.removeDirectRecord(key)(callback)
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Implementation of the Store interface
   //
   /////////////////////////////////////////////////////////////////////
 
@@ -133,11 +163,13 @@ class HawtDBStore extends Store with Bas
    * Deletes all stored data from the store.
    */
   def purge(callback: =>Unit) = {
-    client.purge(^{
-      next_queue_key.set(1)
-      next_msg_key.set(1)
-      callback
-    })
+    executor_pool {
+      client.purge(^{
+        next_queue_key.set(1)
+        next_msg_key.set(1)
+        callback
+      })
+    }
   }
 
 
@@ -145,29 +177,37 @@ class HawtDBStore extends Store with Bas
    * Ges the last queue key identifier stored.
    */
   def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
-    callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+    executor_pool {
+      callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+    }
   }
 
   def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
-    client.addQueue(record, ^{ callback(true) })
+    executor_pool {
+     client.addQueue(record, ^{ callback(true) })
+    }
   }
 
   def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
-    client.removeQueue(queueKey,^{ callback(true) })
+    executor_pool {
+      client.removeQueue(queueKey,^{ callback(true) })
+    }
   }
 
   def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
-    executor_pool ^{
+    executor_pool {
       callback( client.getQueueStatus(queueKey) )
     }
   }
 
   def listQueues(callback: (Seq[Long]) => Unit) = {
-    executor_pool ^{
+    executor_pool {
       callback( client.listQueues )
     }
   }
 
+  
+
   val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
   load_source.setEventHandler(^{drain_loads});
   load_source.resume

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul  7 04:17:35 2010
@@ -18,13 +18,13 @@ package org.apache.activemq.broker.store
 
 import model._
 import model.Type.TypeCreatable
-import org.apache.activemq.apollo.store.{MessageRecord, QueueRecord, QueueEntryRecord}
 import org.fusesource.hawtbuf.codec._
 import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
 import java.io.{IOException, DataInput, DataOutput}
 import org.fusesource.hawtdb.internal.journal.{LocationCodec, Location}
 import org.fusesource.hawtdb.api._
 import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessage}
+import org.apache.activemq.apollo.store.{DirectRecord, MessageRecord, QueueRecord, QueueEntryRecord}
 
 /**
  * <p>
@@ -109,7 +109,7 @@ object Helpers {
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
     rc.value = pb.getValue
-    rc.stream = pb.getStreamKey
+    rc.directKey = pb.getStreamKey
     rc.expiration = pb.getExpiration
     rc
   }
@@ -120,7 +120,7 @@ object Helpers {
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
     pb.setValue(v.value)
-    pb.setStreamKey(v.stream)
+    pb.setStreamKey(v.directKey)
     pb.setExpiration(v.expiration)
     pb
   }
@@ -147,6 +147,13 @@ object Helpers {
     pb
   }
 
+  implicit def toDirectRecord(pb: AddDirect.Getter): DirectRecord = {
+    val rc = new DirectRecord
+    rc.key = pb.getDirectKey
+    rc.size = pb.getSize
+    rc
+  }
+  
   implicit def toLocation(value: Long): Location = {
     val temp = new Buffer(8)
     val editor = temp.bigEndianEditor
@@ -228,4 +235,9 @@ object Helpers {
   SUBSCRIPTIONS_INDEX_FACTORY.setValueCodec(AddSubscription.FRAMED_CODEC);
   SUBSCRIPTIONS_INDEX_FACTORY.setDeferredEncoding(true);
 
+  val DIRECT_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddDirect.Buffer]();
+  DIRECT_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+  DIRECT_INDEX_FACTORY.setValueCodec(AddDirect.FRAMED_CODEC);
+  DIRECT_INDEX_FACTORY.setDeferredEncoding(true);
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:17:35 2010
@@ -29,9 +29,9 @@ import Stomp._
 import BufferConversions._
 import StompFrameConstants._
 import java.io.IOException
-import org.apache.activemq.broker.store.StoreUOW
 import org.apache.activemq.selector.SelectorParser
 import org.apache.activemq.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.broker.store.{DirectRecordStore, StoreUOW}
 
 object StompConstants {
 
@@ -170,6 +170,10 @@ class StompProtocolHandler extends Proto
     connection.connector.broker.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
+        if( this.host.store!=null && this.host.store.isInstanceOf[DirectRecordStore] ) {
+          val wf = connection.transport.getWireformat.asInstanceOf[StompWireFormat]
+          wf.direct_record_store = this.host.store.asInstanceOf[DirectRecordStore]
+        }
         connection.transport.resumeRead
       }
     )

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:17:35 2010
@@ -70,7 +70,7 @@ class StompWireFormat extends WireFormat
   import StompWireFormat._
   override protected def log: Log = StompWireFormat
 
-  var directRecordStore:DirectRecordStore = null
+  var direct_record_store:DirectRecordStore = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java Wed Jul  7 04:17:35 2010
@@ -22,13 +22,14 @@ import org.fusesource.hawtbuf.Buffer;
 import java.nio.ByteBuffer;
 
 /**
- * A memory mapped record.
+ * A memory mapped direct buffer associated with a key
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class DirectRecord {
 
     public long key = -1;
+    public int size = 0;
     public ByteBuffer buffer;
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul  7 04:17:35 2010
@@ -28,7 +28,7 @@ public class MessageRecord {
     public AsciiBuffer protocol;
     public int size;
     public Buffer value;
-    public long stream = -1;
+    public long directKey = -1;
     public long expiration = 0;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:17:35 2010
@@ -105,6 +105,24 @@ trait Store extends ServiceTrait {
    */
   def loadMessage(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
 
-
 }
 
+/**
+ * Optional interface that stores can implement to give protocols direct access to the file system
+ * for them to be able to do
+ */
+trait DirectRecordStore {
+
+  def createDirectRecord(size:Int)(callback:(DirectRecord)=>Unit):Unit
+
+  def openDirectRecord(key:Long)(callback:(Option[DirectRecord])=>Unit):Unit
+
+  def closeDirectRecord(record:DirectRecord):Unit 
+
+  def removeDirectRecord(record:DirectRecord)(callback:(Boolean)=>Unit):Unit = {
+    removeDirectRecord(record.key)( callback )
+  }
+  def removeDirectRecord(key:Long)(callback:(Boolean)=>Unit):Unit
+
+
+}