You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:22:31 UTC

svn commit: r961216 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/ activemq-hawtdb/src/main/resources/META-IN...

Author: chirino
Date: Wed Jul  7 04:22:31 2010
New Revision: 961216

URL: http://svn.apache.org/viewvc?rev=961216&view=rev
Log:
renamed MemoryPool to DirectBufferPool.
starting to hooking in support for in in the store interfaces.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools
      - copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala
      - copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala
      - copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala
      - copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala
      - copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:22:31 2010
@@ -169,7 +169,7 @@ class Delivery extends BaseRetained {
   def createMessageRecord() = {
     val sm = new MessageRecord
     sm.protocol = message.protocol
-    sm.value = ProtocolFactory.get(message.protocol).encode(message)
+    sm.buffer = ProtocolFactory.get(message.protocol).encode(message)
     sm.size = size
     sm
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:22:31 2010
@@ -1167,7 +1167,7 @@ class QueueEntry(val queue:Queue, val se
         queue.loading_size -= size
 
         val delivery = new Delivery()
-        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+        delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.buffer)
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:22:31 2010
@@ -17,11 +17,9 @@
 package org.apache.activemq.apollo.broker;
 
 import _root_.java.util.{ArrayList, HashMap}
-import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
 import _root_.scala.collection.JavaConversions._
-import _root_.scala.reflect.BeanProperty
 import path.PathFilter
 import org.fusesource.hawtbuf.AsciiBuffer
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -31,10 +29,9 @@ import org.apache.activemq.broker.store.
 import org.fusesource.hawtbuf.proto.WireFormat
 import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
-import java.io.File
 import java.util.concurrent.TimeUnit
 import org.apache.activemq.apollo.util.LongCounter
-import org.apache.activemq.apollo.{MemoryPoolFactory, MemoryPool}
+import org.apache.activemq.apollo.{DirectBufferPoolFactory, DirectBufferPool}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -103,7 +100,7 @@ class VirtualHost(val broker: Broker, va
   }
 
   var store:Store = null
-  var memory_pool:MemoryPool = null
+  var memory_pool:DirectBufferPool = null
   var transactionManager:TransactionManagerX = new TransactionManagerX
   var protocols = Map[AsciiBuffer, WireFormat]()
   val queue_id_counter = new LongCounter
@@ -128,18 +125,22 @@ class VirtualHost(val broker: Broker, va
 
   override protected def _start(onCompleted:Runnable):Unit = {
 
-//    val memory_pool_config: String = null
-    val memory_pool_config: String = "hawtdb:activemq.tmp"
+    val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
+    store = StoreFactory.create(config.store)
 
-    if( MemoryPoolFactory.validate(memory_pool_config) ) {
-      memory_pool = MemoryPoolFactory.create(memory_pool_config)
-      if( memory_pool!=null ) {
-        memory_pool.start
-      }
+    //    val memory_pool_config: String = null
+    var direct_buffer_pool_config: String = "hawtdb:activemq.tmp"
+
+    if( direct_buffer_pool_config!=null &&  (store!=null && !store.supportsDirectBuffers) ) {
+      warn("The direct buffer pool will not be used because the configured store does not support them.")
+      direct_buffer_pool_config = null
+    }
+
+    if( direct_buffer_pool_config!=null ) {
+      memory_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config)
+      memory_pool.start
     }
 
-    val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
-    store = StoreFactory.create(config.store)
     if( store!=null ) {
       store.configure(config.store, this)
       val storeStartupDone = tracker.task("store startup")

Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul  7 04:22:31 2010
@@ -55,24 +55,22 @@ class CassandraClient() {
     }
   }
 
-  implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
+  def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
     import PBMessageRecord._
     val pb = PBMessageRecord.FACTORY.parseUnframed(v)
     val rc = new MessageRecord
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
-    rc.value = pb.getValue
-    rc.directKey = pb.getStream
+    rc.buffer = pb.getValue
     rc.expiration = pb.getExpiration
     rc
   }
 
-  implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
+  def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
     val pb = new PBMessageRecord.Bean
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
-    pb.setValue(v.value)
-    pb.setStream(v.directKey)
+    pb.setValue(v.buffer)
     pb.setExpiration(v.expiration)
     pb.freeze.toUnframedByteArray
   }
@@ -172,7 +170,7 @@ class CassandraClient() {
               case (msg, action) =>
                 var rc =
                 if (action.store != null) {
-                  operations ::= Insert( schema.message_data \ (msg, action.store) )
+                  operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.store) ) ) 
                 }
                 action.enqueues.foreach {
                   queueEntry =>
@@ -197,7 +195,7 @@ class CassandraClient() {
       session =>
         session.get(schema.message_data \ id) match {
           case Some(x) =>
-            val rc: MessageRecord = x.value
+            val rc: MessageRecord = decodeMessageRecord(x.value)
             rc.key = id
             Some(rc)
           case None =>

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala Wed Jul  7 04:22:31 2010
@@ -18,7 +18,7 @@ package org.apache.activemq.broker.store
 
 import org.fusesource.hawtdispatch.BaseRetained
 import org.fusesource.hawtdb.api.Paged.SliceType
-import org.apache.activemq.apollo.{MemoryAllocation, MemoryPool}
+import org.apache.activemq.apollo.{DirectBuffer, DirectBufferPool}
 import java.nio.ByteBuffer
 import org.fusesource.hawtdb.api.PageFileFactory
 import java.io.File
@@ -29,7 +29,7 @@ import java.io.File
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBMemoryPool(val file:File) extends MemoryPool {
+class HawtDBDirectBufferPool(val file:File) extends DirectBufferPool {
 
   private val pageFilefactory = new PageFileFactory()
   private def pageFile = pageFilefactory.getPageFile
@@ -56,10 +56,7 @@ class HawtDBMemoryPool(val file:File) ex
     }
   }
 
-  class HawtMemoryAllocation(page:Int, page_count:Int, alloc_size:Int, original:ByteBuffer, slice:ByteBuffer) extends BaseRetained with MemoryAllocation {
-    def size = alloc_size
-    def buffer = slice
-
+  class HawtMemoryAllocation(val page:Int, val page_count:Int, val original:ByteBuffer, val buffer:ByteBuffer) extends BaseRetained with DirectBuffer {
     override def dispose = {
       pageFile.unslice(original)
       pageFile.allocator.free(page, page_count)
@@ -70,10 +67,8 @@ class HawtDBMemoryPool(val file:File) ex
     val page_count: Int = pageFile.pages(alloc_size)
     val page = pageFile.allocator.alloc(page_count)
     val original = pageFile.slice(SliceType.READ_WRITE, page, page_count)
-
     original.limit(original.position+alloc_size)
-
     val slice = original.slice
-    new HawtMemoryAllocation(page, page_count, alloc_size, original, slice)
+    new HawtMemoryAllocation(page, page_count, original, slice)
   }
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala Wed Jul  7 04:22:31 2010
@@ -18,27 +18,27 @@ package org.apache.activemq.broker.store
 
 import java.io.File
 import java.lang.String
-import org.apache.activemq.apollo.MemoryPoolFactory
+import org.apache.activemq.apollo.DirectBufferPoolFactory
 
 /**
  * <p>
- * Hook to use a HawtDBMemoryPool for the memory pool implementation.
+ * Hook to use a HawtDBDirectBufferPool for the memory pool implementation.
  * </p>
  * <p>
  * This class is discovered using the following resource file:
- * <code>META-INF/services/org.apache.activemq.apollo/memory-pools</code>
+ * <code>META-INF/services/org.apache.activemq.apollo/direct-buffer-pools</code>
  * </p>
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class HawtDBMemoryPoolSPI extends MemoryPoolFactory.SPI {
+class HawtDBDirectBufferPoolSPI extends DirectBufferPoolFactory.SPI {
 
   val prefix: String = "hawtdb:"
 
   def create(config: String) = {
     if( config.startsWith(prefix) ) {
       val file = new File(config.substring(prefix.length))
-      new HawtDBMemoryPool(file)
+      new HawtDBDirectBufferPool(file)
     } else {
       null
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul  7 04:22:31 2010
@@ -108,7 +108,7 @@ object Helpers {
     rc.key = pb.getMessageKey
     rc.protocol = pb.getProtocol
     rc.size = pb.getSize
-    rc.value = pb.getValue
+    rc.buffer = pb.getValue
     rc.expiration = pb.getExpiration
     rc
   }
@@ -118,7 +118,7 @@ object Helpers {
     pb.setMessageKey(v.key)
     pb.setProtocol(v.protocol)
     pb.setSize(v.size)
-    pb.setValue(v.value)
+    pb.setValue(v.buffer)
     pb.setExpiration(v.expiration)
     pb
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 04:22:31 2010
@@ -23,7 +23,7 @@ import collection.mutable.ListBuffer
 import java.lang.{String, Class}
 import java.io.DataOutput
 import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.MemoryAllocation
+import org.apache.activemq.apollo.DirectBuffer
 import org.fusesource.hawtdispatch.BaseRetained
 
 /**
@@ -192,7 +192,7 @@ case class BufferStompContent(content:Bu
   def utf8:UTF8Buffer = content.utf8
 }
 
-case class DirectStompContent(direct:MemoryAllocation) extends StompContent {
+case class DirectStompContent(direct:DirectBuffer) extends StompContent {
   def length = direct.size-1
 
   def writeTo(os:DataOutput) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:22:31 2010
@@ -31,7 +31,7 @@ import _root_.scala.collection.JavaConve
 import StompFrameConstants._
 import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
-import org.apache.activemq.apollo.{MemoryAllocation, MemoryPool}
+import org.apache.activemq.apollo.{DirectBuffer, DirectBufferPool}
 
 /**
  * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
@@ -70,7 +70,7 @@ class StompWireFormat extends WireFormat
   import StompWireFormat._
   override protected def log: Log = StompWireFormat
 
-  var memory_pool:MemoryPool = null
+  var memory_pool:DirectBufferPool = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
@@ -507,7 +507,7 @@ class StompWireFormat extends WireFormat
   }
 
 
-  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> {
+  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:DirectBuffer):FrameReader = (buffer)=> {
     if( read_content_direct(ma) ) {
       next_action = read_action
       new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
@@ -516,7 +516,7 @@ class StompWireFormat extends WireFormat
     }
   }
 
-  def read_content_direct(ma:MemoryAllocation) = {
+  def read_content_direct(ma:DirectBuffer) = {
       val read_limit = ma.buffer.position
       if( read_limit < ma.size ) {
         read_end = read_limit

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul  7 04:22:31 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.store;
 
+import org.apache.activemq.apollo.DirectBuffer;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 
@@ -27,8 +28,8 @@ public class MessageRecord {
     public long key = -1;
     public AsciiBuffer protocol;
     public int size;
-    public Buffer value;
-    public long directKey = -1;
+    public Buffer buffer;
+    public DirectBuffer direct_buffer = null;
     public long expiration = 0;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:22:31 2010
@@ -34,6 +34,12 @@ trait Store extends ServiceTrait {
   def storeStatusDTO(callback:(StoreStatusDTO)=>Unit)
 
   /**
+   * @returns true if the store implementation can handle accepting
+   *          MessageRecords with DirectBuffers in them.
+   */
+  def supportsDirectBuffers() = false
+
+  /**
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul  7 04:22:31 2010
@@ -98,8 +98,8 @@ abstract class StoreBenchmarkSupport ext
   def addMessage(batch:StoreUOW, content:String):Long = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
-    message.value = ascii(content).buffer
-    message.size = message.value.length
+    message.buffer = ascii(content).buffer
+    message.size = message.buffer.length
     batch.store(message)
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul  7 04:22:31 2010
@@ -97,8 +97,8 @@ abstract class StoreFunSuiteSupport exte
   def addMessage(batch:StoreUOW, content:String):Long = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
-    message.value = ascii(content).buffer
-    message.size = message.value.length
+    message.buffer = ascii(content).buffer
+    message.size = message.buffer.length
     batch.store(message)
   }
 
@@ -138,7 +138,7 @@ abstract class StoreFunSuiteSupport exte
 
     val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
     expect(ascii("message 1").buffer) {
-      rc.get.value
+      rc.get.buffer
     }
   }
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala Wed Jul  7 04:22:31 2010
@@ -22,12 +22,13 @@ import org.apache.activemq.Service
 
 /**
  * <p>
+ * A DirectBuffer holds a reference counted Direct ByteBuffer
  * </p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait MemoryAllocation extends Retained {
-  def size:Int
+trait DirectBuffer extends Retained {
+  def size = buffer.capacity
   def buffer:ByteBuffer
 }
 
@@ -37,6 +38,6 @@ trait MemoryAllocation extends Retained 
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait MemoryPool extends Service {
-  def alloc(size:Int):MemoryAllocation
+trait DirectBufferPool extends Service {
+  def alloc(size:Int):DirectBuffer
 }
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala Wed Jul  7 04:22:31 2010
@@ -24,37 +24,37 @@ import org.apache.activemq.apollo.util.C
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object MemoryPoolFactory {
+object DirectBufferPoolFactory {
 
-  val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/memory-pools")
-  var memory_pool_spis = List[SPI]()
+  val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/direct-buffer-pools")
+  var direct_buffer_pools_spis = List[SPI]()
 
   trait SPI {
-    def create(config:String):MemoryPool
+    def create(config:String):DirectBufferPool
     def validate(config: String):Boolean
   }
 
   finder.find.foreach{ clazz =>
     try {
       val SPI = clazz.newInstance.asInstanceOf[SPI]
-      memory_pool_spis ::= SPI
+      direct_buffer_pools_spis ::= SPI
     } catch {
       case e:Throwable =>
         e.printStackTrace
     }
   }
 
-  def create(config:String):MemoryPool = {
+  def create(config:String):DirectBufferPool = {
     if( config == null ) {
       return null
     }
-    memory_pool_spis.foreach { spi=>
+    direct_buffer_pools_spis.foreach { spi=>
       val rc = spi.create(config)
       if( rc!=null ) {
         return rc
       }
     }
-    throw new IllegalArgumentException("Uknonwn memory pool type: "+config)
+    throw new IllegalArgumentException("Uknonwn direct buffer pool type: "+config)
   }
 
 
@@ -62,7 +62,7 @@ object MemoryPoolFactory {
     if( config == null ) {
       return true
     } else {
-      memory_pool_spis.foreach { spi=>
+      direct_buffer_pools_spis.foreach { spi=>
         if( spi.validate(config) ) {
           return true
         }