You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/29 19:43:03 UTC

svn commit: r1162914 - in /activemq/activemq-apollo/trunk: ./ apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/ apollo-broker/src/main/proto/ apollo-broker/...

Author: chirino
Date: Mon Aug 29 17:43:03 2011
New Revision: 1162914

URL: http://svn.apache.org/viewvc?rev=1162914&view=rev
Log:
Avoid using deprecated hawtdispatch APIs.  This in turn required us to rework the zero copy strategy.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
      - copied, changed from r1162770, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Mon Aug 29 17:43:03 2011
@@ -64,11 +64,11 @@ class BDBClient(store: BDBStore) {
 
   var environment:Environment = _
 
-  var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+  var direct_buffer_allocator: FileDirectBufferAllocator = _
 
-  def zero_copy_dir = {
+  def direct_buffer_file = {
     import FileSupport._
-    config.directory / "zerocp"
+    config.directory / "dbuffer.dat"
   }
 
   def start() = {
@@ -79,10 +79,7 @@ class BDBClient(store: BDBStore) {
 
     directory.mkdirs
 
-    if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
-      zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
-      zero_copy_buffer_allocator.start
-    }
+    direct_buffer_allocator = new FileDirectBufferAllocator(direct_buffer_file)
 
     environment = new Environment(directory, env_config);
 
@@ -92,22 +89,18 @@ class BDBClient(store: BDBStore) {
       message_refs_db
       queues_db
 
-      if( zero_copy_buffer_allocator!=null ) {
-        zerocp_db.cursor(tx) { (_,value)=>
-          val v = decode_zcp_value(value)
-          zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
-          true
-        }
+      lobs_db.cursor(tx) { (_,value)=>
+        val v = decode_lob_value(value)
+        direct_buffer_allocator.alloc_at(v._1, v._2)
+        true
       }
     }
   }
 
   def stop() = {
     environment.close
-    if( zero_copy_buffer_allocator!=null ) {
-      zero_copy_buffer_allocator.stop
-      zero_copy_buffer_allocator = null
-    }
+    direct_buffer_allocator.close
+    direct_buffer_allocator = null
   }
 
   case class TxContext(tx:Transaction) {
@@ -137,12 +130,12 @@ class BDBClient(store: BDBStore) {
       _messages_db
     }
 
-    private var _zerocp_db:Database = _
-    def zerocp_db:Database = {
-      if( _zerocp_db==null ) {
-        _zerocp_db = environment.openDatabase(tx, "zerocp", long_key_conf)
+    private var _lobs_db:Database = _
+    def lobs_db:Database = {
+      if( _lobs_db==null ) {
+        _lobs_db = environment.openDatabase(tx, "lobs", long_key_conf)
       }
-      _zerocp_db
+      _lobs_db
     }
 
     private var _message_refs_db:Database = _
@@ -185,6 +178,9 @@ class BDBClient(store: BDBStore) {
       if( _map_db!=null ) {
         _map_db.close
       }
+      if( _lobs_db!=null ) {
+        _lobs_db.close
+      }
 
       if(ok){
         tx.commit
@@ -299,13 +295,11 @@ class BDBClient(store: BDBStore) {
     import ctx._
     if( add_and_get(message_refs_db, msg_key, -1, tx)==0 ) {
       messages_db.delete(tx, msg_key)
-      if( zero_copy_buffer_allocator!=null ){
-        zerocp_db.get(tx, to_database_entry(msg_key)).foreach { v=>
-          val location  = decode_zcp_value(v)
-          zero_copy_buffer_allocator.free(location._1, location._2, location._3)
-        }
-        zerocp_db.delete(tx, msg_key)
+      lobs_db.get(tx, to_database_entry(msg_key)).foreach { v=>
+        val location  = decode_lob_value(v)
+        direct_buffer_allocator.free(location._1, location._2)
       }
+      lobs_db.delete(tx, msg_key)
     }
   }
 
@@ -334,7 +328,7 @@ class BDBClient(store: BDBStore) {
     val sync = uows.find( ! _.complete_listeners.isEmpty ).isDefined
     with_ctx(sync) { ctx=>
       import ctx._
-      var zcp_files_to_sync = Set[Int]()
+      var sync_lobs = false
       uows.foreach { uow =>
 
           for((key,value) <- uow.map_actions) {
@@ -352,14 +346,13 @@ class BDBClient(store: BDBStore) {
               if (message_record != null) {
                 import PBSupport._
 
-                val pb = if( message_record.zero_copy_buffer != null ) {
+                val pb = if( message_record.direct_buffer != null ) {
                   val r = to_pb(action.message_record).copy
-                  val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
-                  r.setZcpFile(buffer.file)
-                  r.setZcpOffset(buffer.offset)
-                  r.setZcpSize(buffer.size)
-                  zerocp_db.put(tx, message_record.key, (buffer.file, buffer.offset, buffer.size))
-                  zcp_files_to_sync += buffer.file
+                  val buffer = direct_buffer_allocator.copy(message_record.direct_buffer)
+                  r.setDirectOffset(buffer.offset)
+                  r.setDirectSize(buffer.size)
+                  lobs_db.put(tx, message_record.key, (buffer.offset, buffer.size))
+                  sync_lobs = true
                   r.freeze
                 } else {
                   to_pb(action.message_record)
@@ -379,8 +372,8 @@ class BDBClient(store: BDBStore) {
               }
           }
       }
-      if( zero_copy_buffer_allocator!=null ) {
-        zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+      if( sync_lobs ) {
+        direct_buffer_allocator.sync
       }
     }
     callback.run
@@ -490,8 +483,8 @@ class BDBClient(store: BDBStore) {
             import PBSupport._
             val pb:MessagePB.Buffer = data
             val rc = from_pb(pb)
-            if( pb.hasZcpFile ) {
-              rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+            if( pb.hasDirectSize ) {
+              rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
             }
             rc
           }
@@ -519,8 +512,8 @@ class BDBClient(store: BDBStore) {
             import PBSupport._
             val pb:MessagePB.Buffer = data
             val rc = from_pb(pb)
-            if( pb.hasZcpFile ) {
-              rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+            if( pb.hasDirectSize ) {
+              rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
             }
             rc
           }
@@ -583,16 +576,10 @@ class BDBClient(store: BDBStore) {
           messages_db.cursor(tx) { (_, data) =>
             import PBSupport._
             val pb = MessagePB.FACTORY.parseUnframed(data.getData)
-            if( pb.hasZcpFile ) {
-              val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
-              var data = pb.copy
-              data.clearZcpFile
-              data.clearZcpFile
-              // write the pb frame and then the direct buffer data..
-              data.freeze.writeFramed(message_stream)
-              zcpb.read(message_stream)
-            } else {
-              pb.writeFramed(message_stream)
+            pb.writeFramed(message_stream)
+            if( pb.hasDirectSize ) {
+              val buffer_cp_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
+              buffer_cp_buffer.read(message_stream)
             }
             true
           }
@@ -641,9 +628,6 @@ class BDBClient(store: BDBStore) {
           }
         }
 
-        var zcp_counter = 0
-        val max_ctx = zero_copy_buffer_allocator.contexts.size
-
         streams.using_map_stream { stream=>
           foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
             map_db.put(tx, pb.getKey, pb.getValue)
@@ -653,19 +637,14 @@ class BDBClient(store: BDBStore) {
         streams.using_message_stream { message_stream=>
           foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
 
-            val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
-              val cp = pb.copy
-              val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
-              cp.setZcpFile(zcpb.file)
-              cp.setZcpOffset(zcpb.offset)
-
-              zcp_counter += 1
-              zcpb.write(message_stream)
-
-              zerocp_db.put(tx, pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
-              cp.freeze
-            } else {
-              pb
+            var record:MessagePB.Buffer = pb
+            if( record.hasDirectSize ) {
+              val cp = record.copy
+              val buffer_cp_buffer = direct_buffer_allocator.alloc(cp.getDirectSize)
+              cp.setDirectOffset(buffer_cp_buffer.offset)
+              buffer_cp_buffer.write(message_stream)
+              lobs_db.put(tx, pb.getMessageKey, (buffer_cp_buffer.offset, buffer_cp_buffer.size))
+              record = cp.freeze
             }
 
             messages_db.put(tx, record.getMessageKey, record)

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Mon Aug 29 17:43:03 2011
@@ -57,8 +57,6 @@ class BDBStore(var config:BDBStoreDTO) e
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
-  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
-
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     write_executor {
       client.store(uows, ^{

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Mon Aug 29 17:43:03 2011
@@ -38,19 +38,17 @@ object HelperTrait {
   implicit def to_buffer(entry: DatabaseEntry): Buffer = new Buffer(entry.getData)
   implicit def to_database_entry(v: Buffer): DatabaseEntry = new DatabaseEntry(v.toByteArray)
 
-  implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+  implicit def decode_lob_value(entry: DatabaseEntry): (Long,Int) = {
     val in = new DataByteArrayInputStream(entry.getData)
-    (in.readVarInt(), in.readVarLong(), in.readVarInt())
+    (in.readVarLong(), in.readVarInt())
   }
-  implicit def encode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+  implicit def encode_zcp_value(v: (Long,Int)): DatabaseEntry = {
     val out = new DataByteArrayOutputStream(
-      AbstractVarIntSupport.computeVarIntSize(v._1) +
-      AbstractVarIntSupport.computeVarLongSize(v._2) +
-      AbstractVarIntSupport.computeVarIntSize(v._3)
+      AbstractVarIntSupport.computeVarLongSize(v._1) +
+      AbstractVarIntSupport.computeVarIntSize(v._2)
     )
-    out.writeVarInt(v._1)
-    out.writeVarLong(v._2)
-    out.writeVarInt(v._3)
+    out.writeVarLong(v._1)
+    out.writeVarInt(v._2)
     new DatabaseEntry(out.toBuffer.data)
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java Mon Aug 29 17:43:03 2011
@@ -37,9 +37,6 @@ public class BDBStoreDTO extends StoreDT
     @XmlAttribute(name="read_threads")
     public Integer read_threads;
 
-    @XmlAttribute(name="zero_copy")
-    public Boolean zero_copy;
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -50,7 +47,6 @@ public class BDBStoreDTO extends StoreDT
 
         if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
         if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null) return false;
-        if (zero_copy != null ? !zero_copy.equals(that.zero_copy) : that.zero_copy != null) return false;
 
         return true;
     }
@@ -60,7 +56,6 @@ public class BDBStoreDTO extends StoreDT
         int result = super.hashCode();
         result = 31 * result + (directory != null ? directory.hashCode() : 0);
         result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
-        result = 31 * result + (zero_copy != null ? zero_copy.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Mon Aug 29 17:43:03 2011
@@ -29,10 +29,10 @@ message MessagePB {
   optional bytes value = 4;
   optional sint64 expiration = 5;
   
-  optional bytes zcp_data = 10;
-  optional int32 zcp_file = 12;
-  optional int64 zcp_offset = 13;
-  optional int32 zcp_size = 14;
+  optional bytes direct_data = 10;
+  optional bytes direct_file = 12;
+  optional int64 direct_offset = 13;
+  optional int32 direct_size = 14;
 }
 
 message QueuePB {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Aug 29 17:43:03 2011
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.util.O
 import org.apache.activemq.apollo.dto._
 import security._
 import security.SecuredResource.VirtualHostKind
-import store.{PersistentLongCounter, Store, StoreFactory}
+import store._
 
 trait VirtualHostFactory {
   def create(broker:Broker, dto:VirtualHostDTO):VirtualHost
@@ -104,6 +104,8 @@ class VirtualHost(val broker: Broker, va
   var connection_log:Log = _
   var console_log:Log = _
 
+  var direct_buffer_allocator:DirectBufferAllocator = null
+
   def resource_kind = VirtualHostKind
 
   @volatile
@@ -165,6 +167,13 @@ class VirtualHost(val broker: Broker, va
   override protected def _start(on_completed:Runnable):Unit = {
     apply_update
 
+    if ( config.heap_bypass.getOrElse(0) > 0 ) {
+      import org.apache.activemq.apollo.util.FileSupport._
+      val tmp_dir = broker.tmp / "heapbypass" / id
+      tmp_dir.recursive_delete
+      direct_buffer_allocator = new ConcurrentFileDirectBufferAllocator(tmp_dir)
+    }
+
     store = StoreFactory.create(config.store)
 
     val tracker = new LoggingTracker("virtual host startup", console_log)
@@ -229,7 +238,13 @@ class VirtualHost(val broker: Broker, va
         task.run()
       }
     }
-    tracker.callback(on_completed)
+    tracker.callback(dispatch_queue.runnable {
+      if( direct_buffer_allocator !=null ) {
+        direct_buffer_allocator.close
+        direct_buffer_allocator
+      }
+      on_completed.run()
+    })
   }
 
 

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (from r1162770, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala&r1=1162770&r2=1162914&rev=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala Mon Aug 29 17:43:03 2011
@@ -26,8 +26,9 @@ import org.fusesource.hawtdispatch.Retai
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait ZeroCopyBufferAllocator {
-  def alloc(size:Int):ZeroCopyBuffer
+trait DirectBufferAllocator {
+  def alloc(size:Int):DirectBuffer
+  def close
 }
 
 /**
@@ -40,7 +41,7 @@ trait ZeroCopyBufferAllocator {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait ZeroCopyBuffer extends Retained {
+trait DirectBuffer extends Retained {
 
   def size:Int
 
@@ -50,6 +51,8 @@ trait ZeroCopyBuffer extends Retained {
 
   def read(src: Int, target: WritableByteChannel): Int
 
+  def copy(src:DirectBuffer): Unit
+
   def write(src:ReadableByteChannel, target:Int): Int
 
   def write(src:ByteBuffer, target:Int):Int

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala?rev=1162914&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala Mon Aug 29 17:43:03 2011
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.fusesource.hawtdispatch.BaseRetained
+import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
+import java.io._
+import org.apache.activemq.apollo.util._
+import java.nio.channels.FileChannel.MapMode
+import java.security.{AccessController, PrivilegedAction}
+import java.nio.{MappedByteBuffer, ByteBuffer}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentHashMap, TimeUnit}
+import java.util.Comparator
+
+/**
+ * <p>Tracks allocated space</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class Allocation(offset:Long, size:Int) {
+  var _free_func: (Allocation)=>Unit = _
+  def free() = {
+    _free_func(this)
+  }
+}
+
+object Range {
+  def apply(a:Allocation):Range = Range(a.offset, a.size)
+}
+
+/**
+  * A range of space.
+  */
+case class Range(offset:Long, size:Long) {
+
+  // split the allocation..
+  def split(request:Int):(Range, Range) = {
+    assert(request < size)
+    var first = Range(offset, request)
+    var second = Range(offset+request, size-request)
+    (first, second)
+  }
+
+  // join the range..
+  def join(that:Range):Range = {
+    assert( that.offset == offset+size)
+    Range(offset, size+that.size)
+  }
+
+}
+
+trait Allocator {
+  def alloc(request:Int):Allocation
+
+  def chain(that:Allocator):Allocator = new Allocator() {
+    def alloc(request: Int): Allocation = {
+      val rc = Allocator.this.alloc(request)
+      if( rc == null ) {
+        that.alloc(request)
+      } else {
+        rc
+      }
+    }
+  }
+}
+
+/**
+ * <p>Manges allocation space using a couple trees to track the free areas.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TreeAllocator(range:Range) extends Allocator {
+
+  // list of the free allocation areas.  Sorted by size then offset
+  val free_by_size = new TreeMap[Range, Zilch](new Comparator[Range] {
+    def compare(p1: Range, p2: Range) = {
+      var rc = p1.size - p2.size
+      if( rc!=0 ) {
+        rc = p1.offset - p2.offset
+      }
+      if ( rc == 0 ) {
+        0
+      } else if ( rc < 0 ) {
+        -1
+      } else {
+        1
+      }
+    }
+  })
+
+  // list of the free allocation areas sorted by offset.
+  val free_by_offset = new TreeMap[Long, Range]()
+
+  free_by_offset.put(range.offset, range)
+  free_by_size.put(range, null)
+
+  def alloc(request:Int):Allocation = {
+    var spot_entry = free_by_size.ceilingEntry(Range(0,request))
+    if( spot_entry== null ) {
+      return null
+    }
+
+    val range = spot_entry.getKey
+    free_by_size.removeEntry(spot_entry)
+    free_by_offset.remove(range.offset)
+
+    // might be the perfect size
+    val rc = if( range.size == request ) {
+      range
+    } else {
+      // split the allocation..
+      var (first, second) = range.split(request)
+
+      // put the free part in the free map.
+      free_by_offset.put(second.offset, second)
+      free_by_size.put(second, null)
+
+      first
+    }
+    val allocation = Allocation(rc.offset, request)
+    allocation._free_func = free
+    allocation
+  }
+
+  def alloc_at(req:Allocation):Boolean = {
+    var spot_entry = free_by_offset.floorEntry(req.offset)
+    if( spot_entry== null ) {
+      return false
+    }
+
+    var spot = spot_entry.getValue
+    if( spot.offset+spot.size < req.offset+req.size ) {
+      return false
+    }
+
+    free_by_offset.removeEntry(spot_entry)
+    free_by_size.remove(spot)
+
+    // only need to put back if it was not exactly what we need.
+    if( spot != req ) {
+
+      // deal with excess at the front
+      if( spot.offset != req.offset ) {
+        val (prev, next) = spot.split((req.offset - spot.offset).toInt)
+        free_by_offset.put(prev.offset, prev)
+        free_by_size.put(prev, null)
+        spot = next
+      }
+
+      // deal with excess at the rear
+      if( spot.size != req.size ) {
+        val (prev, next) = spot.split(req.size)
+        free_by_offset.put(next.offset, next)
+        free_by_size.put(next, null)
+      }
+    }
+
+    req._free_func = free
+    true
+  }
+
+  def free(allocation:Allocation):Unit = {
+
+    var prev_e = free_by_offset.floorEntry(allocation.offset)
+    var next_e = if( prev_e!=null ) {
+      prev_e.next
+    } else {
+      free_by_offset.ceilingEntry(allocation.offset)
+    }
+
+    val prev = Option(prev_e).map(_.getValue).map( a=> if(a.offset+a.size == allocation.offset) a else null ).getOrElse(null)
+    val next = Option(prev_e).map(_.getValue).map( a=> if(allocation.offset+allocation.size == a.offset) a else null ).getOrElse(null)
+
+    val range = Range(allocation)
+    (prev, next) match {
+      case (null, null)=>
+        allocation._free_func = null
+        free_by_size.put(range, null)
+        free_by_offset.put(range.offset, range)
+
+      case (prev, null)=>
+        val joined = prev.join(range)
+        free_by_size.remove(prev)
+        free_by_size.put(joined, null)
+        free_by_offset.put(joined.offset, joined)
+
+      case (null, next)=>
+        val joined = range.join(next)
+        free_by_size.remove(next)
+        free_by_size.put(joined, null)
+
+        free_by_offset.remove(next.offset)
+        free_by_offset.put(joined.offset, joined)
+
+      case (prev, next)=>
+        val joined = prev.join(range.join(next))
+        free_by_size.remove(prev)
+        free_by_size.remove(next)
+        free_by_size.put(joined, null)
+
+        free_by_offset.remove(next.offset)
+        free_by_offset.put(joined.offset, joined)
+    }
+  }
+}
+
+/**
+ * Helps minimize the active page set by allocating in areas
+ * which had previously been allocated.
+ */
+class ActiveAllocator(val range:Range) extends Allocator {
+
+  // the cold allocated start with all the free space..
+  val inactive = new TreeAllocator(range)
+
+  // the hot is clear of any free space.
+  val active = new TreeAllocator(range)
+
+  active.free_by_offset.clear
+  active.free_by_size.clear
+
+  // allocate out of the hot area first since
+  // that should result in less vm swapping
+  val chain = active.chain(inactive)
+
+  def alloc(request:Int):Allocation = {
+    var rc = chain.alloc(request)
+    if( rc!=null ) {
+      rc._free_func = free
+    }
+    rc
+  }
+
+  def free(allocation:Allocation):Unit = {
+    // put stuff back in the hot tree.
+    active.free(allocation)
+  }
+
+}
+
+/**
+ * <p>The ByteBufferReleaser allows you to more eagerly deallocate byte buffers.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ByteBufferReleaser {
+  val release: (ByteBuffer) => Unit = {
+
+    // Try to drill into the java.nio.DirectBuffer internals...
+    AccessController.doPrivileged(new PrivilegedAction[(ByteBuffer) => Unit]() {
+      def run = {
+        try {
+
+          val cleanerMethod = ByteBuffer.allocateDirect(1).getClass().getMethod("cleaner")
+          cleanerMethod.setAccessible(true)
+          val cleanMethod = cleanerMethod.getReturnType().getMethod("clean")
+
+          def clean(buffer: ByteBuffer):Unit = {
+            try {
+              val cleaner = cleanerMethod.invoke(buffer)
+              if (cleaner != null) {
+                cleanMethod.invoke(cleaner)
+              }
+            } catch {
+              case e: Throwable => e.printStackTrace
+            }
+          }
+
+          clean _
+        } catch {
+          case _ =>
+            def noop(buffer: ByteBuffer):Unit = { }
+            noop _
+        }
+      }
+    })
+  }
+}
+
+object FileDirectBufferAllocator {
+  val OS = System.getProperty("os.name").toLowerCase
+
+  val MMAP_TRANSFER_TO = Option(System.getProperty("apollo.MMAP_TRANSFER_TO")).map(_ == "true").getOrElse{
+    // System prop is not set.. lets pick a good default based on OS
+    if( OS.startsWith("mac") ) {
+      // mmap is faster on the mac than the FileChannel.transferTo call.
+      true
+    } else {
+      false
+    }
+  }
+  val MMAP_TRANSFER_FROM = Option(System.getProperty("apollo.MMAP_TRANSFER_FROM")).map(_ == "true").getOrElse{
+    // System prop is not set.. lets pick a good default based on OS
+    if( OS.startsWith("mac") ) {
+      false
+    } else {
+      false
+    }
+  }
+}
+
+class FileDirectBufferAllocator(val file:File) extends DirectBufferAllocator {
+  import FileDirectBufferAllocator._
+
+  file.getParentFile.mkdirs()
+
+  val allocator = new TreeAllocator(Range(0, Long.MaxValue))
+  val channel:FileChannel = new RandomAccessFile(file, "rw").getChannel
+  val free_queue = new ConcurrentLinkedQueue[Allocation]()
+  var current_size = 0L
+  var _mmap:MappedByteBuffer = _
+
+  channel.truncate(0);
+
+  def close() = {
+    if(_mmap!=null) {
+      ByteBufferReleaser.release(_mmap)
+      _mmap = null
+    }
+    channel.close()
+  }
+
+  def mmap_slice(offset:Long, size:Int) = {
+    if( _mmap == null ) {
+      _mmap = channel.map(MapMode.READ_WRITE, 0, current_size)
+    }
+
+    // remaps more of the file when needed.
+    if( _mmap.capacity < offset+size ) {
+      assert(current_size >= offset+size)
+      ByteBufferReleaser.release(_mmap)
+
+      val grow = 1024*1024*64
+      _mmap = channel.map(MapMode.READ_WRITE, 0, current_size+grow)
+
+      // initialize the grown part...
+      _mmap.position(current_size.toInt)
+      while(_mmap.hasRemaining) {
+        _mmap.put(0.toByte)
+      }
+      current_size += grow
+      _mmap.clear
+    }
+
+    _mmap.position(offset.toInt)
+    _mmap.limit(offset.toInt+size)
+    val slice = _mmap.slice
+    _mmap.clear
+    slice
+  }
+
+  /**
+   * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+   *
+   * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+   */
+  class AllocationBuffer(val allocation:Allocation) extends BaseRetained with DirectBuffer {
+
+    def file = FileDirectBufferAllocator.this.file
+    def offset: Long = allocation.offset
+    def size: Int = allocation.size.toInt
+
+    var buffer = if( MMAP_TRANSFER_TO ) {
+      mmap_slice(offset, size)
+    } else {
+      null
+    }
+
+    override def dispose: Unit = {
+      free_queue.add(allocation)
+      if( buffer!=null ) {
+        ByteBufferReleaser.release(buffer)
+        buffer = null
+      }
+      super.dispose
+    }
+
+    def remaining(pos: Int): Int = size-pos
+
+    def time[T](name:String)(func: =>T):T = {
+      val c = new TimeCounter
+      try {
+        c.time(func)
+      } finally {
+        println("%s: %.2f".format(name, c.apply(true).maxTime(TimeUnit.MILLISECONDS)))
+      }
+    }
+
+    def read(src: Int, target: WritableByteChannel): Int = {
+      assert(retained > 0)
+      val count: Int = remaining(src)
+      assert(count>=0)
+
+      if( MMAP_TRANSFER_TO ) {
+        buffer.position(src);
+        buffer.limit(src+count)
+        val slice = buffer.slice();
+        try {
+          target.write(slice)
+        } finally {
+          ByteBufferReleaser.release(slice)
+        }
+      } else {
+        channel.transferTo(offset+src, count, target).toInt
+      }
+    }
+
+    def write(src: ReadableByteChannel, target:Int): Int = {
+      assert(retained > 0)
+      val count: Int = remaining(target)
+      assert(count>=0)
+
+      if( MMAP_TRANSFER_FROM ) {
+        buffer.position(target);
+        buffer.limit(target+count)
+        val slice = buffer.slice();
+        try {
+          src.read(slice)
+        } finally {
+          ByteBufferReleaser.release(slice)
+        }
+      } else {
+        channel.transferFrom(src, offset+target, count).toInt
+      }
+    }
+
+    def copy(src: DirectBuffer) = {
+      if( src.size != this.size ) {
+        throw new IllegalArgumentException("src buffer does not match the size of this buffer")
+      }
+      src.read(0, channel)
+    }
+
+    def read(target: OutputStream): Unit = {
+      assert(retained > 0)
+      val b = ByteBuffer.allocate(size.min(1024*4))
+      var pos = 0
+      while( remaining(pos)> 0 ) {
+        val count = channel.read(b, offset+pos)
+        if( count == -1 ) {
+          throw new EOFException()
+        }
+        target.write(b.array, 0, count)
+        pos += count
+        b.clear
+      }
+    }
+
+    def write(src: ByteBuffer, target: Int): Int = {
+      assert(retained > 0)
+      val diff = src.remaining - remaining(target)
+      if( diff > 0 ) {
+        src.limit(src.limit-diff)
+      }
+      try {
+        channel.write(src, offset+target).toInt
+      } finally {
+        if( diff > 0 ) {
+          src.limit(src.limit+diff)
+        }
+      }
+    }
+
+    def write(target: InputStream): Unit = {
+      assert(retained > 0)
+      val b = ByteBuffer.allocate(size.min(1024*4))
+      var pos = 0
+      while( remaining(pos)> 0 ) {
+        val max = remaining(pos).min(b.capacity)
+        b.clear
+        val count = target.read(b.array, 0, max)
+        if( count == -1 ) {
+          throw new EOFException()
+        }
+        val x = channel.write(b)
+        assert(x == count)
+        pos += count
+      }
+    }
+  }
+
+  def alloc(size: Int) = {
+    drain_free_allocations
+    val allocation = allocator.alloc(size)
+    assert(allocation!=null)
+    current_size = current_size.max(allocation.offset + allocation.size)
+    new AllocationBuffer(allocation)
+  }
+
+  def alloc_at(offset:Long, size:Int) = {
+    allocator.alloc_at(Allocation(offset, size))
+  }
+
+  def free(offset:Long, size:Int) = {
+    allocator.free(Allocation(offset, size))
+  }
+
+  def slice(offset:Long, size:Int) = {
+    new AllocationBuffer(Allocation(offset, size))
+  }
+
+  def drain_free_allocations = {
+    var allocation = free_queue.poll()
+    while( allocation!=null ) {
+      allocator.free(allocation)
+      allocation = free_queue.poll()
+    }
+  }
+
+  def copy(source:DirectBuffer) = {
+    val rc = alloc(source.size)
+    rc.copy(source)
+    rc
+  }
+
+  def sync = {
+    channel.force(true)
+  }
+}
+
+
+/**
+ * <p>A ZeroCopyBufferAllocator which allocates on files.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ConcurrentFileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
+  import FileDirectBufferAllocator._
+
+  final val context_counter = new AtomicInteger();
+  final val contexts = new ConcurrentHashMap[Thread, FileDirectBufferAllocator]();
+
+  @volatile
+  var closed = false;
+
+  directory.mkdirs
+  closed = false;
+
+  def close() = {
+    closed = true;
+    import collection.JavaConversions._
+    contexts.values().foreach(_.close)
+    contexts.clear
+  }
+
+  def alloc(size: Int): DirectBuffer = {
+    val thread: Thread = Thread.currentThread()
+    var ctx = contexts.get(thread)
+    if( ctx == null ) {
+      if (closed) {
+        throw new IllegalStateException("Stopped");
+      } else {
+        var id = context_counter.incrementAndGet();
+        ctx = new FileDirectBufferAllocator(new File(directory, "zerocp-"+id+".data" ))
+        contexts.put(thread, ctx);
+      }
+    }
+    ctx.alloc(size)
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Mon Aug 29 17:43:03 2011
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.Buffer
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -30,7 +30,7 @@ class MessageRecord {
   var protocol: AsciiBuffer = _
   var size = 0
   var buffer: Buffer = _
-  var zero_copy_buffer: ZeroCopyBuffer = _
+  var direct_buffer: DirectBuffer = _
   var expiration = 0L
   var locator:AtomicReference[Array[Byte]] = _
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Mon Aug 29 17:43:03 2011
@@ -43,12 +43,6 @@ trait Store extends ServiceTrait {
   def get_store_status(callback:(StoreStatusDTO)=>Unit)
 
   /**
-   * @returns a ZeroCopyBufferAllocator if the store supports protocols
-   *          using zero copy buffers when transfering messages.
-   */
-  def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
-
-  /**
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Mon Aug 29 17:43:03 2011
@@ -84,12 +84,18 @@ public class VirtualHostDTO extends Serv
     public LogCategoryDTO log_category;
 
     /**
+     * If set the the broker will avoid allocating messages larger than the configured
+     * setting on the JVM heap.  They will be held in temp files until consumed or persisted
+     */
+    @XmlElement(name="heap_bypass")
+    public Integer heap_bypass;
+
+    /**
      * To hold any other non-matching XML elements
      */
     @XmlAnyElement(lax=true)
     public List<Object> other = new ArrayList<Object>();
 
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -104,6 +110,7 @@ public class VirtualHostDTO extends Serv
         if (auto_create_destinations != null ? !auto_create_destinations.equals(that.auto_create_destinations) : that.auto_create_destinations != null)
             return false;
         if (dsubs != null ? !dsubs.equals(that.dsubs) : that.dsubs != null) return false;
+        if (heap_bypass != null ? !heap_bypass.equals(that.heap_bypass) : that.heap_bypass != null) return false;
         if (host_names != null ? !host_names.equals(that.host_names) : that.host_names != null) return false;
         if (log_category != null ? !log_category.equals(that.log_category) : that.log_category != null) return false;
         if (other != null ? !other.equals(that.other) : that.other != null) return false;
@@ -132,6 +139,7 @@ public class VirtualHostDTO extends Serv
         result = 31 * result + (regroup_connections != null ? regroup_connections.hashCode() : 0);
         result = 31 * result + (authentication != null ? authentication.hashCode() : 0);
         result = 31 * result + (log_category != null ? log_category.hashCode() : 0);
+        result = 31 * result + (heap_bypass != null ? heap_bypass.hashCode() : 0);
         result = 31 * result + (other != null ? other.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Mon Aug 29 17:43:03 2011
@@ -61,8 +61,6 @@ class HawtDBStore(val config:HawtDBStore
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
-  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
-
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     write_executor {
       client.store(uows, ^{

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Mon Aug 29 17:43:03 2011
@@ -50,15 +50,14 @@ object JDBM2Client extends Log {
     def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
   }
 
-  object ZeroCopyValueSerializer extends Serializer[(Int, Long, Int)] {
-    def serialize(out: SerializerOutput, v: (Int,Long, Int)) = {
-      out.writePackedInt(v._1)
-      out.writePackedLong(v._2)
-      out.writePackedInt(v._3)
+  object ZeroCopyValueSerializer extends Serializer[(Long, Int)] {
+    def serialize(out: SerializerOutput, v: (Long, Int)) = {
+      out.writePackedLong(v._1)
+      out.writePackedInt(v._2)
     }
 
     def deserialize(in: SerializerInput) = {
-      (in.readPackedInt, in.readPackedLong, in.readPackedInt)
+      (in.readPackedLong, in.readPackedInt)
     }
   }
 
@@ -163,18 +162,18 @@ class JDBM2Client(store: JDBM2Store) {
   var queues_db:HTree[Long, QueueRecord] = _
   var entries_db:BTree[(Long,Long), QueueEntryRecord] = _
   var messages_db:HTree[Long, MessagePB.Buffer] = _
-  var zerocp_db:HTree[Long, (Int, Long, Int)] = _
+  var lobs_db:HTree[Long, (Long, Int)] = _
   var message_refs_db:HTree[Long, java.lang.Integer] = _
   var map_db:HTree[Buffer, Buffer] = _
 
   var last_message_key = 0L
   var last_queue_key = 0L
 
-  var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+  var direct_buffer_allocator: FileDirectBufferAllocator = _
 
-  def zero_copy_dir = {
+  def direct_buffer_file = {
     import FileSupport._
-    config.directory / "zerocp"
+    config.directory / "dbuffer.data"
   }
 
   def start() = {
@@ -184,10 +183,7 @@ class JDBM2Client(store: JDBM2Store) {
 
     config.directory.mkdirs
 
-    if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
-      zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
-      zero_copy_buffer_allocator.start
-    }
+    direct_buffer_allocator = new FileDirectBufferAllocator(direct_buffer_file)
 
     recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
 
@@ -220,7 +216,7 @@ class JDBM2Client(store: JDBM2Store) {
     transaction {
       messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
       map_db = init_htree("map", value_serializer = BufferSerializer, key_serializer = BufferSerializer)
-      zerocp_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
+      lobs_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
       message_refs_db = init_htree("message_refs")
       queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
       entries_db = init_btree("enttries", new QueueEntryKeyComparator, QueueEntryKeySerializer, QueueEntryRecordSerializer)
@@ -228,11 +224,9 @@ class JDBM2Client(store: JDBM2Store) {
       last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
       last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
 
-      if( zero_copy_buffer_allocator!=null ) {
-        zerocp_db.cursor { (_,v)=>
-          zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
-          true
-        }
+      lobs_db.cursor { (_,v)=>
+        direct_buffer_allocator.alloc_at(v._1, v._2)
+        true
       }
     }
 
@@ -241,10 +235,8 @@ class JDBM2Client(store: JDBM2Store) {
   def stop() = {
     recman.close
     recman = null;
-    if( zero_copy_buffer_allocator!=null ) {
-      zero_copy_buffer_allocator.stop
-      zero_copy_buffer_allocator = null
-    }
+    direct_buffer_allocator.close
+    direct_buffer_allocator = null
   }
 
   def transaction[T](func: => T): T = {
@@ -268,8 +260,8 @@ class JDBM2Client(store: JDBM2Store) {
       if( config.directory.isDirectory ) {
         config.directory.listFiles.filter(_.getName.startsWith("jdbm2.")).foreach(_.delete)
       }
-      if( zero_copy_dir.isDirectory ) {
-        zero_copy_dir.listFiles.foreach(_.delete)
+      if( direct_buffer_file.isDirectory ) {
+        direct_buffer_file.listFiles.foreach(_.delete)
       }
     }
     if( recman!=null ) {
@@ -324,12 +316,10 @@ class JDBM2Client(store: JDBM2Store) {
       gc.foreach { key=>
         message_refs_db.remove(key)
         messages_db.remove(key)
-        if( zero_copy_buffer_allocator!=null ){
-          val location = zerocp_db.find(key)
-          if( location!=null ) {
-            zero_copy_buffer_allocator.free(location._1, location._2, location._3)
-            zerocp_db.remove(key)
-          }
+        val location = lobs_db.find(key)
+        if( location!=null ) {
+          direct_buffer_allocator.free(location._1, location._2)
+          lobs_db.remove(key)
         }
       }
     }
@@ -368,7 +358,7 @@ class JDBM2Client(store: JDBM2Store) {
 
   def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
     transaction {
-      var zcp_files_to_sync = Set[Int]()
+      var direct_buffer_sync = false
       uows.foreach { uow =>
 
         for((key,value) <- uow.map_actions) {
@@ -384,14 +374,13 @@ class JDBM2Client(store: JDBM2Store) {
           val message_record = action.message_record
           if (message_record != null) {
 
-            val pb = if( message_record.zero_copy_buffer != null ) {
+            val pb = if( message_record.direct_buffer != null ) {
               val r = to_pb(action.message_record).copy
-              val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
-              r.setZcpFile(buffer.file)
-              r.setZcpOffset(buffer.offset)
-              r.setZcpSize(buffer.size)
-              zerocp_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
-              zcp_files_to_sync += buffer.file
+              val buffer = direct_buffer_allocator.copy(message_record.direct_buffer)
+              r.setDirectOffset(buffer.offset)
+              r.setDirectSize(buffer.size)
+              lobs_db.put(message_record.key, (buffer.offset, buffer.size))
+              direct_buffer_sync = true
               r.freeze
             } else {
               to_pb(action.message_record)
@@ -416,8 +405,8 @@ class JDBM2Client(store: JDBM2Store) {
 
         }
       }
-      if( zero_copy_buffer_allocator!=null ) {
-        zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+      if( direct_buffer_sync ) {
+        direct_buffer_allocator.sync
       }
     }
     callback.run
@@ -489,8 +478,8 @@ class JDBM2Client(store: JDBM2Store) {
       val record = metric_load_from_index_counter.time {
         Option(messages_db.find(message_key)).map{ pb=>
           val rc = from_pb(pb)
-          if( pb.hasZcpFile ) {
-            rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+          if( pb.hasDirectSize ) {
+            rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
           }
           rc
         }
@@ -531,16 +520,10 @@ class JDBM2Client(store: JDBM2Store) {
       }
       streams.using_message_stream { message_stream=>
         messages_db.cursor { (_, pb) =>
-          if( pb.hasZcpFile ) {
-            val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
-            var data = pb.copy
-            data.clearZcpFile
-            data.clearZcpFile
-            // write the pb frame and then the direct buffer data..
-            data.freeze.writeFramed(message_stream)
-            zcpb.read(message_stream)
-          } else {
-            pb.writeFramed(message_stream)
+          pb.writeFramed(message_stream)
+          if( pb.hasDirectSize ) {
+            val buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
+            buffer.read(message_stream)
           }
           true
         }
@@ -608,27 +591,18 @@ class JDBM2Client(store: JDBM2Store) {
 
         recman.commit
 
-        var zcp_counter = 0
-        val max_ctx = zero_copy_buffer_allocator.contexts.size
-
         streams.using_message_stream { message_stream=>
           foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
 
-            val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
-              val cp = pb.copy
-              val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
-              cp.setZcpFile(zcpb.file)
-              cp.setZcpOffset(zcpb.offset)
-
-              zcp_counter += 1
-              zcpb.write(message_stream)
-
-              zerocp_db.put(pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
-              cp.freeze
-            } else {
-              pb
+            var record:MessagePB.Buffer = pb
+            if( pb.hasDirectSize ) {
+              val cp = record.copy
+              val buffer = direct_buffer_allocator.alloc(cp.getDirectSize)
+              cp.setDirectOffset(buffer.offset)
+              buffer.write(message_stream)
+              lobs_db.put(pb.getMessageKey, (buffer.offset, buffer.size))
+              record = cp.freeze
             }
-
             messages_db.put(record.getMessageKey, record)
             check_flush(record.getSize, 1024*124*10)
           }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Mon Aug 29 17:43:03 2011
@@ -54,8 +54,6 @@ class JDBM2Store(var config:JDBM2StoreDT
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
-  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
-
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     executor {
       client.store(uows, ^{

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java Mon Aug 29 17:43:03 2011
@@ -37,9 +37,6 @@ public class JDBM2StoreDTO extends Store
     @XmlAttribute(name="compact_interval")
     public Integer compact_interval;
 
-    @XmlAttribute(name="zero_copy")
-    public Boolean zero_copy;
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -51,7 +48,6 @@ public class JDBM2StoreDTO extends Store
         if (compact_interval != null ? !compact_interval.equals(that.compact_interval) : that.compact_interval != null)
             return false;
         if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
-        if (zero_copy != null ? !zero_copy.equals(that.zero_copy) : that.zero_copy != null) return false;
 
         return true;
     }
@@ -61,7 +57,6 @@ public class JDBM2StoreDTO extends Store
         int result = super.hashCode();
         result = 31 * result + (directory != null ? directory.hashCode() : 0);
         result = 31 * result + (compact_interval != null ? compact_interval.hashCode() : 0);
-        result = 31 * result + (zero_copy != null ? zero_copy.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Aug 29 17:43:03 2011
@@ -30,7 +30,7 @@ import org.apache.activemq.apollo.transp
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
 import org.apache.activemq.apollo.util.Log._
 
 object StompCodec extends Log {
@@ -46,10 +46,10 @@ object StompCodec extends Log {
     rc.expiration = message.expiration
 
     if( frame.content.isInstanceOf[ZeroCopyContent] ) {
-      rc.zero_copy_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
+      rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
     }
 
-    def buffer_size = if (rc.zero_copy_buffer!=null) { frame.size - (rc.zero_copy_buffer.size - 1) } else { frame.size }
+    def buffer_size = if (rc.direct_buffer!=null) { frame.size - (rc.direct_buffer.size - 1) } else { frame.size }
     val os = new ByteArrayOutputStream(buffer_size)
 
     frame.action.writeTo(os)
@@ -82,7 +82,7 @@ object StompCodec extends Log {
         os.write(NEWLINE)
       }
       os.write(NEWLINE)
-      if ( rc.zero_copy_buffer==null ) {
+      if ( rc.direct_buffer==null ) {
         frame.content.writeTo(os)
       }
     }
@@ -127,10 +127,10 @@ object StompCodec extends Log {
       line = read_line
     }
 
-    if( message.zero_copy_buffer==null ) {
+    if( message.direct_buffer==null ) {
       new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
     } else {
-      new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.zero_copy_buffer)))
+      new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.direct_buffer)))
     }
   }
 
@@ -143,7 +143,7 @@ class StompCodec extends ProtocolCodec {
   var max_headers = 1000
   var max_data_length = 1024 * 1024 * 100
 
-  var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
+  var direct_buffer_allocator:DirectBufferAllocator = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
@@ -164,10 +164,10 @@ class StompCodec extends ProtocolCodec {
   var write_channel:WritableByteChannel = null
 
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
-  var next_write_direct:ZeroCopyBuffer = null
+  var next_write_direct:DirectBuffer = null
 
   var write_buffer = ByteBuffer.allocate(0)
-  var write_direct:ZeroCopyBuffer = null
+  var write_direct:DirectBuffer = null
   var write_direct_pos = 0
   var last_write_io_size = 0
 
@@ -313,7 +313,7 @@ class StompCodec extends ProtocolCodec {
 
   var last_read_io_size = 0
 
-  var read_direct:ZeroCopyBuffer = null
+  var read_direct:DirectBuffer = null
   var read_direct_pos = 0
 
   var next_action:FrameReader = read_action
@@ -480,9 +480,9 @@ class StompCodec extends ProtocolCodec {
           // lets try to keep the content of big message outside of the JVM's garbage collection
           // to keep the number of GCs down when moving big messages.
           def is_message = action == SEND || action == MESSAGE
-          if( length > 1024 && zero_copy_buffer_allocator!=null && is_message) {
+          if( length > 1024 && direct_buffer_allocator!=null && is_message) {
 
-            read_direct = zero_copy_buffer_allocator.alloc(length)
+            read_direct = direct_buffer_allocator.alloc(length)
 
             val dup = buffer.duplicate
             dup.position(read_start)
@@ -529,7 +529,7 @@ class StompCodec extends ProtocolCodec {
     null
   }
 
-  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:ZeroCopyBuffer):FrameReader = (buffer)=> {
+  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> {
     if( read_frame_terminator(buffer, contentLength) ) {
       next_action = read_action
       new StompFrame(ascii(action), headers.toList, ZeroCopyContent(ma))

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Aug 29 17:43:03 2011
@@ -21,7 +21,7 @@ import collection.mutable.ListBuffer
 import java.lang.{String, Class}
 import org.apache.activemq.apollo.broker._
 import java.io.OutputStream
-import org.apache.activemq.apollo.broker.store.ZeroCopyBuffer
+import org.apache.activemq.apollo.broker.store.DirectBuffer
 import org.apache.activemq.apollo.dto.DestinationDTO
 
 /**
@@ -194,7 +194,7 @@ case class BufferContent(content:Buffer)
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class ZeroCopyContent(zero_copy_buffer:ZeroCopyBuffer) extends StompContent {
+case class ZeroCopyContent(zero_copy_buffer:DirectBuffer) extends StompContent {
   def length = zero_copy_buffer.size-1
 
   def writeTo(os:OutputStream) = {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Aug 29 17:43:03 2011
@@ -795,11 +795,7 @@ class StompProtocolHandler extends Proto
       }
 
       connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
-
-      if( this.host.store!=null && this.host.store.zero_copy_buffer_allocator!=null ) {
-        val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-        wf.zero_copy_buffer_allocator = this.host.store.zero_copy_buffer_allocator
-      }
+      codec.direct_buffer_allocator = this.host.direct_buffer_allocator
     }
 
     reset {

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Aug 29 17:43:03 2011
@@ -96,7 +96,7 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.4</hawtdispatch-version>
+    <hawtdispatch-version>1.5-SNAPSHOT</hawtdispatch-version>
     <hawtbuf-version>1.6</hawtbuf-version>
     
     <jdbm-version>2.0.1</jdbm-version>