You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/07 15:13:00 UTC

svn commit: r1056328 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/brok...

Author: chirino
Date: Fri Jan  7 14:12:59 2011
New Revision: 1056328

URL: http://svn.apache.org/viewvc?rev=1056328&view=rev
Log:
Rename DirectBuffer* to ZeroCopy* as it's more descriptive of what it's trying to accomplish (folks might have thought DirectBuffer was for direct io or something).

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
      - copied, changed from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
      - copied, changed from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Jan  7 14:12:59 2011
@@ -32,7 +32,7 @@ import org.apache.activemq.apollo.util.O
 import org.apache.activemq.apollo.util.path.{Path, PathParser}
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO, VirtualHostDTO}
 import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
-import org.apache.activemq.apollo.broker.store.{DirectBufferAllocator, Store, StoreFactory}
+import org.apache.activemq.apollo.broker.store.{ZeroCopyBufferAllocator, Store, StoreFactory}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala (from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala&r1=1056135&r2=1056328&rev=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala Fri Jan  7 14:12:59 2011
@@ -17,111 +17,19 @@
 package org.apache.activemq.apollo.broker.store
 
 import org.fusesource.hawtdispatch._
+import org.fusesource.hawtdispatch.internal.DispatcherConfig
+import org.fusesource.hawtdispatch.BaseRetained
 import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
 import java.nio.ByteBuffer
 import java.io._
 import org.apache.activemq.apollo.util._
-import org.fusesource.hawtdispatch.internal.DispatcherConfig
 
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait DirectBufferAllocator {
-  def alloc(size:Int):DirectBuffer
-}
 
 /**
- * <p>
- * A DirectBuffer is a reference counted buffer on
- * temp storage designed to be accessed with direct io.
- * </p>
+ * <p>Tracks allocated space</p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DirectBuffer extends Retained {
-
-  def size:Int
-
-  def remaining(from_position: Int): Int
-
-  def read(target: OutputStream):Unit
-
-  def read(src: Int, target: WritableByteChannel): Int
-
-  def write(src:ReadableByteChannel, target:Int): Int
-
-  def write(src:ByteBuffer, target:Int):Int
-
-  def link(target:File):Long
-}
-
-trait FileDirectBufferTrait extends DirectBuffer {
-
-  def offset:Long
-  def channel:FileChannel
-
-  def remaining(pos: Int): Int = size-pos
-
-  def read(src: Int, target: WritableByteChannel): Int = {
-    assert(retained > 0)
-    val count: Int = remaining(src)
-    assert(count>=0)
-    channel.transferTo(offset+src, count, target).toInt
-  }
-
-  def read(target: OutputStream): Unit = {
-    assert(retained > 0)
-    val b = ByteBuffer.allocate(size.min(1024*4))
-    var pos = 0
-    while( remaining(pos)> 0 ) {
-      val count = channel.read(b, offset+pos)
-      if( count == -1 ) {
-        throw new EOFException()
-      }
-      target.write(b.array, 0, count)
-      pos += count
-      b.clear
-    }
-  }
-
-  def write(src: ReadableByteChannel, target:Int): Int = {
-    assert(retained > 0)
-    val count: Int = remaining(target)
-    assert(count>=0)
-    channel.transferFrom(src, offset+target, count).toInt
-  }
-
-  def write(src: ByteBuffer, target: Int): Int = {
-    assert(retained > 0)
-    val diff = src.remaining - remaining(target)
-    if( diff > 0 ) {
-      src.limit(src.limit-diff)
-    }
-    try {
-      channel.write(src, offset+target).toInt
-    } finally {
-      if( diff > 0 ) {
-        src.limit(src.limit+diff)
-      }
-    }
-  }
-
-  def link(target: File): Long = {
-    assert(retained > 0)
-    // TODO: implement with a real file system hard link
-    // to get copy on write goodness.
-    import FileSupport._
-    using(new FileOutputStream(target).getChannel) { target=>
-      val count = channel.transferTo(offset, size, target)
-      assert( count == size )
-    }
-    return 0;
-  }
-}
-
 case class Allocation(offset:Long, size:Long) extends Ordered[Allocation] {
 
   var _free_func: (Allocation)=>Unit = _
@@ -170,6 +78,11 @@ trait Allocator {
   }
 }
 
+/**
+ * <p>Manges allocation space using a couple trees to track the free areas.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
 class TreeAllocator(range:Allocation) extends Allocator {
 
   // list of the free allocation areas.  Sorted by size then offset
@@ -292,7 +205,8 @@ class TreeAllocator(range:Allocation) ex
 }
 
 /**
- * Helps minimize the active page set.
+ * Helps minimize the active page set by allocating in areas
+ * which had previously been allocated.
  */
 class ActiveAllocator(val range:Allocation) extends Allocator {
 
@@ -325,12 +239,69 @@ class ActiveAllocator(val range:Allocati
 }
 
 /**
- * <p>
- * </p>
+ * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait FileZeroCopyBufferTrait extends ZeroCopyBuffer {
+
+  def offset:Long
+  def channel:FileChannel
+
+  def remaining(pos: Int): Int = size-pos
+
+  def read(src: Int, target: WritableByteChannel): Int = {
+    assert(retained > 0)
+    val count: Int = remaining(src)
+    assert(count>=0)
+    channel.transferTo(offset+src, count, target).toInt
+  }
+
+  def read(target: OutputStream): Unit = {
+    assert(retained > 0)
+    val b = ByteBuffer.allocate(size.min(1024*4))
+    var pos = 0
+    while( remaining(pos)> 0 ) {
+      val count = channel.read(b, offset+pos)
+      if( count == -1 ) {
+        throw new EOFException()
+      }
+      target.write(b.array, 0, count)
+      pos += count
+      b.clear
+    }
+  }
+
+  def write(src: ReadableByteChannel, target:Int): Int = {
+    assert(retained > 0)
+    val count: Int = remaining(target)
+    assert(count>=0)
+    channel.transferFrom(src, offset+target, count).toInt
+  }
+
+  def write(src: ByteBuffer, target: Int): Int = {
+    assert(retained > 0)
+    val diff = src.remaining - remaining(target)
+    if( diff > 0 ) {
+      src.limit(src.limit-diff)
+    }
+    try {
+      channel.write(src, offset+target).toInt
+    } finally {
+      if( diff > 0 ) {
+        src.limit(src.limit+diff)
+      }
+    }
+  }
+
+}
+
+/**
+ * <p>A ZeroCopyBufferAllocator which allocates on files.</p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
+class FileZeroCopyBufferAllocator(val directory:File) extends ZeroCopyBufferAllocator {
 
   // we use thread local allocators to
   class AllocatorContext(val id:Int) {
@@ -357,7 +328,7 @@ class FileDirectBufferAllocator(val dire
       channel.force(size_changed)
     }
 
-    class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait {
+    class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileZeroCopyBufferTrait {
       def channel: FileChannel = AllocatorContext.this.channel
 
       def file = id
@@ -374,7 +345,7 @@ class FileDirectBufferAllocator(val dire
       }
     }
 
-    def alloc(size: Int): DirectBuffer = current_context { ctx=>
+    def alloc(size: Int): ZeroCopyBuffer = current_context { ctx=>
       val allocation = allocator.alloc(size)
       assert(allocation!=null)
       current_size = current_size.max(allocation.offset + allocation.size)
@@ -382,7 +353,7 @@ class FileDirectBufferAllocator(val dire
     }
   }
 
-  def to_alloc_buffer(buffer:DirectBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
+  def to_alloc_buffer(buffer:ZeroCopyBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
 
   val _current_allocator_context = new ThreadLocal[AllocatorContext]()
   var contexts = Map[Int, AllocatorContext]()
@@ -414,7 +385,7 @@ class FileDirectBufferAllocator(val dire
     contexts.get(file).get.sync
   }
 
-  def alloc(size: Int): DirectBuffer = current_context { ctx=>
+  def alloc(size: Int): ZeroCopyBuffer = current_context { ctx=>
     ctx.alloc(size)
   }
 
@@ -426,9 +397,9 @@ class FileDirectBufferAllocator(val dire
     ctx.allocator.free(Allocation(offset, size))
   }
 
-  def view_buffer(file:Int, the_offset:Long, the_size:Int):DirectBuffer = {
+  def view_buffer(file:Int, the_offset:Long, the_size:Int):ZeroCopyBuffer = {
     val the_channel = contexts.get(file).get.channel
-    new BaseRetained with FileDirectBufferTrait {
+    new BaseRetained with FileZeroCopyBufferTrait {
       def offset: Long = the_offset
       def size: Int = the_size
       val channel: FileChannel = the_channel

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Fri Jan  7 14:12:59 2011
@@ -28,7 +28,7 @@ class MessageRecord {
   var protocol: AsciiBuffer = _
   var size = 0
   var buffer: Buffer = _
-  var direct_buffer: DirectBuffer = _
+  var zero_copy_buffer: ZeroCopyBuffer = _
   var expiration = 0L
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Fri Jan  7 14:12:59 2011
@@ -40,10 +40,10 @@ trait Store extends ServiceTrait {
   def get_store_status(callback:(StoreStatusDTO)=>Unit)
 
   /**
-   * @returns true if the store implementation can handle accepting
-   *          MessageRecords with DirectBuffers in them.
+   * @returns a ZeroCopyBufferAllocator if the store supports protocols
+   *          using zero copy buffers when transfering messages.
    */
-  def direct_buffer_allocator():DirectBufferAllocator = null
+  def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
 
   /**
    * Creates a store uow which is used to perform persistent

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala (from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala&r1=1056135&r2=1056328&rev=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala Fri Jan  7 14:12:59 2011
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License") you may not use this file except in compliance with
+ * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -16,19 +16,42 @@
  */
 package org.apache.activemq.apollo.broker.store
 
-import org.fusesource.hawtbuf.AsciiBuffer
-import org.fusesource.hawtbuf.Buffer
+import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
+import java.nio.ByteBuffer
+import java.io._
+import org.fusesource.hawtdispatch.Retained
 
 /**
+ * <p>Allocates ZeroCopyBuffer objects</p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class MessageRecord {
+trait ZeroCopyBufferAllocator {
+  def alloc(size:Int):ZeroCopyBuffer
+}
+
+/**
+ * <p>
+ * A ZeroCopyBuffer is a reference counted buffer on
+ * temp storage.
+ *
+ * ON the
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ZeroCopyBuffer extends Retained {
+
+  def size:Int
+
+  def remaining(from_position: Int): Int
+
+  def read(target: OutputStream):Unit
+
+  def read(src: Int, target: WritableByteChannel): Int
+
+  def write(src:ReadableByteChannel, target:Int): Int
 
-  var key = -1L
-  var protocol: AsciiBuffer = _
-  var size = 0
-  var buffer: Buffer = _
-  var direct_buffer: DirectBuffer = _
-  var expiration = 0L
+  def write(src:ByteBuffer, target:Int):Int
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Jan  7 14:12:59 2011
@@ -154,7 +154,7 @@ class JDBM2Client(store: JDBM2Store) {
   var last_message_key = 0L
   var last_queue_key = 0L
 
-  var direct_buffer_allocator: FileDirectBufferAllocator = _
+  var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
 
   def zero_copy_dir = {
     import FileSupport._
@@ -167,8 +167,8 @@ class JDBM2Client(store: JDBM2Store) {
     config.directory.mkdirs
 
     if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
-      direct_buffer_allocator = new FileDirectBufferAllocator(zero_copy_dir)
-      direct_buffer_allocator.start
+      zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
+      zero_copy_buffer_allocator.start
     }
 
     recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
@@ -209,9 +209,9 @@ class JDBM2Client(store: JDBM2Store) {
       last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
       last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
 
-      if( direct_buffer_allocator!=null ) {
+      if( zero_copy_buffer_allocator!=null ) {
         lobs_db.cursor { (_,v)=>
-          direct_buffer_allocator.alloc_at(v._1, v._2, v._3)
+          zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
           true
         }
       }
@@ -222,9 +222,9 @@ class JDBM2Client(store: JDBM2Store) {
   def stop() = {
     recman.close
     recman = null;
-    if( direct_buffer_allocator!=null ) {
-      direct_buffer_allocator.stop
-      direct_buffer_allocator = null
+    if( zero_copy_buffer_allocator!=null ) {
+      zero_copy_buffer_allocator.stop
+      zero_copy_buffer_allocator = null
     }
   }
 
@@ -303,10 +303,10 @@ class JDBM2Client(store: JDBM2Store) {
       gc.foreach { key=>
         message_refs_db.remove(key)
         messages_db.remove(key)
-        if( direct_buffer_allocator!=null ){
+        if( zero_copy_buffer_allocator!=null ){
           val location = lobs_db.find(key)
           if( location!=null ) {
-            direct_buffer_allocator.free(location._1, location._2, location._3)
+            zero_copy_buffer_allocator.free(location._1, location._2, location._3)
           }
         }
       }
@@ -346,18 +346,18 @@ class JDBM2Client(store: JDBM2Store) {
 
   def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
     transaction {
-      var needs_direct_buffer_sync = Set[Int]()
+      var zcp_files_to_sync = Set[Int]()
       uows.foreach { uow =>
         uow.actions.foreach { case (msg, action) =>
 
           val message_record = action.messageRecord
           if (message_record != null) {
 
-            val pb = if( message_record.direct_buffer != null ) {
+            val pb = if( message_record.zero_copy_buffer != null ) {
               val r = to_pb(action.messageRecord).copy
-              val buffer = direct_buffer_allocator.to_alloc_buffer(message_record.direct_buffer)
+              val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
               lobs_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
-              needs_direct_buffer_sync += buffer.file
+              zcp_files_to_sync += buffer.file
               r.setDirect(true)
               r.freeze
             } else {
@@ -383,8 +383,8 @@ class JDBM2Client(store: JDBM2Store) {
 
         }
       }
-      if( direct_buffer_allocator!=null ) {
-        needs_direct_buffer_sync.foreach(direct_buffer_allocator.sync(_))
+      if( zero_copy_buffer_allocator!=null ) {
+        zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
       }
     }
     callback.run
@@ -450,7 +450,7 @@ class JDBM2Client(store: JDBM2Store) {
           val rc = from_pb(pb)
           if( pb.getDirect ) {
             val location = lobs_db.find(message_key)
-            rc.direct_buffer = direct_buffer_allocator.view_buffer(location._1, location._2, location._3)
+            rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(location._1, location._2, location._3)
           }
           rc
         }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Jan  7 14:12:59 2011
@@ -80,7 +80,7 @@ class JDBM2Store extends DelayingStoreSu
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
-  override def direct_buffer_allocator():DirectBufferAllocator = client.direct_buffer_allocator
+  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
 
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     executor {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Jan  7 14:12:59 2011
@@ -30,7 +30,7 @@ import org.apache.activemq.apollo.transp
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
 
 object StompCodec extends Log {
     val READ_BUFFFER_SIZE = 1024*64;
@@ -50,11 +50,11 @@ object StompCodec extends Log {
     rc.size = frame.size
     rc.expiration = message.expiration
 
-    if( frame.content.isInstanceOf[DirectContent] ) {
-      rc.direct_buffer = frame.content.asInstanceOf[DirectContent].direct_buffer
+    if( frame.content.isInstanceOf[ZeroCopyContent] ) {
+      rc.zero_copy_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
     }
 
-    def buffer_size = if (rc.direct_buffer!=null) { frame.size - (rc.direct_buffer.size - 1) } else { frame.size }
+    def buffer_size = if (rc.zero_copy_buffer!=null) { frame.size - (rc.zero_copy_buffer.size - 1) } else { frame.size }
     val os = new ByteArrayOutputStream(buffer_size)
 
     frame.action.writeTo(os)
@@ -87,7 +87,7 @@ object StompCodec extends Log {
         os.write(NEWLINE)
       }
       os.write(NEWLINE)
-      if ( rc.direct_buffer==null ) {
+      if ( rc.zero_copy_buffer==null ) {
         frame.content.writeTo(os)
       }
     }
@@ -143,10 +143,10 @@ object StompCodec extends Log {
       line = read_line
     }
 
-    if( message.direct_buffer==null ) {
+    if( message.zero_copy_buffer==null ) {
       new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
     } else {
-      new StompFrameMessage(new StompFrame(action, headers.toList, DirectContent(message.direct_buffer)))
+      new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.zero_copy_buffer)))
     }
   }
 
@@ -157,7 +157,7 @@ class StompCodec extends ProtocolCodec w
   import StompCodec._
   override protected def log: Log = StompCodec
 
-  var direct_buffer_allocator:DirectBufferAllocator = null
+  var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
@@ -178,10 +178,10 @@ class StompCodec extends ProtocolCodec w
   var write_channel:WritableByteChannel = null
 
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
-  var next_write_direct:DirectBuffer = null
+  var next_write_direct:ZeroCopyBuffer = null
 
   var write_buffer = ByteBuffer.allocate(0)
-  var write_direct:DirectBuffer = null
+  var write_direct:ZeroCopyBuffer = null
   var write_direct_pos = 0
 
   def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size >> 2)
@@ -250,9 +250,9 @@ class StompCodec extends ProtocolCodec w
       os.write(NEWLINE)
 
       frame.content match {
-        case x:DirectContent=>
+        case x:ZeroCopyContent=>
           assert(next_write_direct==null)
-          next_write_direct = x.direct_buffer
+          next_write_direct = x.zero_copy_buffer
         case x:BufferContent=>
           x.content.writeTo(os)
           END_OF_FRAME_BUFFER.writeTo(os)
@@ -319,7 +319,7 @@ class StompCodec extends ProtocolCodec w
   var read_end = 0
   var read_start = 0
 
-  var read_direct:DirectBuffer = null
+  var read_direct:ZeroCopyBuffer = null
   var read_direct_pos = 0
 
   var next_action:FrameReader = read_action
@@ -482,9 +482,9 @@ class StompCodec extends ProtocolCodec w
           // lets try to keep the content of big message outside of the JVM's garbage collection
           // to keep the number of GCs down when moving big messages.
           def is_message = action == SEND || action == MESSAGE
-          if( length > 1024 && direct_buffer_allocator!=null && is_message) {
+          if( length > 1024 && zero_copy_buffer_allocator!=null && is_message) {
 
-            read_direct = direct_buffer_allocator.alloc(length)
+            read_direct = zero_copy_buffer_allocator.alloc(length)
 
             val dup = buffer.duplicate
             dup.position(read_start)
@@ -531,10 +531,10 @@ class StompCodec extends ProtocolCodec w
     null
   }
 
-  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> {
+  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:ZeroCopyBuffer):FrameReader = (buffer)=> {
     if( read_frame_terminator(buffer, contentLength) ) {
       next_action = read_action
-      new StompFrame(ascii(action), headers.toList, DirectContent(ma))
+      new StompFrame(ascii(action), headers.toList, ZeroCopyContent(ma))
     } else {
       null
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Fri Jan  7 14:12:59 2011
@@ -16,16 +16,12 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.apollo.filter.{Expression, Filterable}
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.ListBuffer
 import java.lang.{String, Class}
 import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.util._
-import org.fusesource.hawtdispatch.BaseRetained
-import java.io.{OutputStream, DataOutput}
-import org.apache.activemq.apollo.broker.store.DirectBuffer
+import java.io.OutputStream
+import org.apache.activemq.apollo.broker.store.ZeroCopyBuffer
 
 /**
  *
@@ -103,7 +99,7 @@ case class StompFrameMessage(frame:Stomp
         } else {
           null
         }
-      case x:DirectContent =>
+      case x:ZeroCopyContent =>
         null
       case NilContent =>
         if( toType == classOf[String] ) {
@@ -203,22 +199,22 @@ case class BufferContent(content:Buffer)
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class DirectContent(direct_buffer:DirectBuffer) extends StompContent {
-  def length = direct_buffer.size-1
+case class ZeroCopyContent(zero_copy_buffer:ZeroCopyBuffer) extends StompContent {
+  def length = zero_copy_buffer.size-1
 
   def writeTo(os:OutputStream) = {
     val buff = new Array[Byte](1024*4)
-    var remaining = direct_buffer.size-1
+    var remaining = zero_copy_buffer.size-1
     while( remaining> 0 ) {
       val c = remaining.min(buff.length)
-      direct_buffer.read(os)
+      zero_copy_buffer.read(os)
       os.write(buff, 0, c)
       remaining -= c
     }
   }
 
   def buffer:Buffer = {
-    val rc = new DataByteArrayOutputStream(direct_buffer.size-1)
+    val rc = new DataByteArrayOutputStream(zero_copy_buffer.size-1)
     writeTo(rc)
     rc.toBuffer
   }
@@ -227,8 +223,8 @@ case class DirectContent(direct_buffer:D
     buffer.utf8
   }
 
-  override def retain = direct_buffer.retain
-  override def release = direct_buffer.release
+  override def retain = zero_copy_buffer.retain
+  override def release = zero_copy_buffer.release
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Jan  7 14:12:59 2011
@@ -571,9 +571,9 @@ class StompProtocolHandler extends Proto
 
       connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
 
-      if( this.host.store!=null && this.host.store.direct_buffer_allocator!=null ) {
+      if( this.host.store!=null && this.host.store.zero_copy_buffer_allocator!=null ) {
         val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-        wf.direct_buffer_allocator = this.host.store.direct_buffer_allocator
+        wf.zero_copy_buffer_allocator = this.host.store.zero_copy_buffer_allocator
       }
     }