You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/07 15:13:00 UTC
svn commit: r1056328 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/brok...
Author: chirino
Date: Fri Jan 7 14:12:59 2011
New Revision: 1056328
URL: http://svn.apache.org/viewvc?rev=1056328&view=rev
Log:
Rename DirectBuffer* to ZeroCopy* as it's more descriptive of what it's trying to accomplish (folks might have thought DirectBuffer was for direct io or something).
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
- copied, changed from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
- copied, changed from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Jan 7 14:12:59 2011
@@ -32,7 +32,7 @@ import org.apache.activemq.apollo.util.O
import org.apache.activemq.apollo.util.path.{Path, PathParser}
import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO, VirtualHostDTO}
import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
-import org.apache.activemq.apollo.broker.store.{DirectBufferAllocator, Store, StoreFactory}
+import org.apache.activemq.apollo.broker.store.{ZeroCopyBufferAllocator, Store, StoreFactory}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala (from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala&r1=1056135&r2=1056328&rev=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala Fri Jan 7 14:12:59 2011
@@ -17,111 +17,19 @@
package org.apache.activemq.apollo.broker.store
import org.fusesource.hawtdispatch._
+import org.fusesource.hawtdispatch.internal.DispatcherConfig
+import org.fusesource.hawtdispatch.BaseRetained
import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
import java.nio.ByteBuffer
import java.io._
import org.apache.activemq.apollo.util._
-import org.fusesource.hawtdispatch.internal.DispatcherConfig
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait DirectBufferAllocator {
- def alloc(size:Int):DirectBuffer
-}
/**
- * <p>
- * A DirectBuffer is a reference counted buffer on
- * temp storage designed to be accessed with direct io.
- * </p>
+ * <p>Tracks allocated space</p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DirectBuffer extends Retained {
-
- def size:Int
-
- def remaining(from_position: Int): Int
-
- def read(target: OutputStream):Unit
-
- def read(src: Int, target: WritableByteChannel): Int
-
- def write(src:ReadableByteChannel, target:Int): Int
-
- def write(src:ByteBuffer, target:Int):Int
-
- def link(target:File):Long
-}
-
-trait FileDirectBufferTrait extends DirectBuffer {
-
- def offset:Long
- def channel:FileChannel
-
- def remaining(pos: Int): Int = size-pos
-
- def read(src: Int, target: WritableByteChannel): Int = {
- assert(retained > 0)
- val count: Int = remaining(src)
- assert(count>=0)
- channel.transferTo(offset+src, count, target).toInt
- }
-
- def read(target: OutputStream): Unit = {
- assert(retained > 0)
- val b = ByteBuffer.allocate(size.min(1024*4))
- var pos = 0
- while( remaining(pos)> 0 ) {
- val count = channel.read(b, offset+pos)
- if( count == -1 ) {
- throw new EOFException()
- }
- target.write(b.array, 0, count)
- pos += count
- b.clear
- }
- }
-
- def write(src: ReadableByteChannel, target:Int): Int = {
- assert(retained > 0)
- val count: Int = remaining(target)
- assert(count>=0)
- channel.transferFrom(src, offset+target, count).toInt
- }
-
- def write(src: ByteBuffer, target: Int): Int = {
- assert(retained > 0)
- val diff = src.remaining - remaining(target)
- if( diff > 0 ) {
- src.limit(src.limit-diff)
- }
- try {
- channel.write(src, offset+target).toInt
- } finally {
- if( diff > 0 ) {
- src.limit(src.limit+diff)
- }
- }
- }
-
- def link(target: File): Long = {
- assert(retained > 0)
- // TODO: implement with a real file system hard link
- // to get copy on write goodness.
- import FileSupport._
- using(new FileOutputStream(target).getChannel) { target=>
- val count = channel.transferTo(offset, size, target)
- assert( count == size )
- }
- return 0;
- }
-}
-
case class Allocation(offset:Long, size:Long) extends Ordered[Allocation] {
var _free_func: (Allocation)=>Unit = _
@@ -170,6 +78,11 @@ trait Allocator {
}
}
+/**
+ * <p>Manges allocation space using a couple trees to track the free areas.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class TreeAllocator(range:Allocation) extends Allocator {
// list of the free allocation areas. Sorted by size then offset
@@ -292,7 +205,8 @@ class TreeAllocator(range:Allocation) ex
}
/**
- * Helps minimize the active page set.
+ * Helps minimize the active page set by allocating in areas
+ * which had previously been allocated.
*/
class ActiveAllocator(val range:Allocation) extends Allocator {
@@ -325,12 +239,69 @@ class ActiveAllocator(val range:Allocati
}
/**
- * <p>
- * </p>
+ * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait FileZeroCopyBufferTrait extends ZeroCopyBuffer {
+
+ def offset:Long
+ def channel:FileChannel
+
+ def remaining(pos: Int): Int = size-pos
+
+ def read(src: Int, target: WritableByteChannel): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(src)
+ assert(count>=0)
+ channel.transferTo(offset+src, count, target).toInt
+ }
+
+ def read(target: OutputStream): Unit = {
+ assert(retained > 0)
+ val b = ByteBuffer.allocate(size.min(1024*4))
+ var pos = 0
+ while( remaining(pos)> 0 ) {
+ val count = channel.read(b, offset+pos)
+ if( count == -1 ) {
+ throw new EOFException()
+ }
+ target.write(b.array, 0, count)
+ pos += count
+ b.clear
+ }
+ }
+
+ def write(src: ReadableByteChannel, target:Int): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(target)
+ assert(count>=0)
+ channel.transferFrom(src, offset+target, count).toInt
+ }
+
+ def write(src: ByteBuffer, target: Int): Int = {
+ assert(retained > 0)
+ val diff = src.remaining - remaining(target)
+ if( diff > 0 ) {
+ src.limit(src.limit-diff)
+ }
+ try {
+ channel.write(src, offset+target).toInt
+ } finally {
+ if( diff > 0 ) {
+ src.limit(src.limit+diff)
+ }
+ }
+ }
+
+}
+
+/**
+ * <p>A ZeroCopyBufferAllocator which allocates on files.</p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
+class FileZeroCopyBufferAllocator(val directory:File) extends ZeroCopyBufferAllocator {
// we use thread local allocators to
class AllocatorContext(val id:Int) {
@@ -357,7 +328,7 @@ class FileDirectBufferAllocator(val dire
channel.force(size_changed)
}
- class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait {
+ class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileZeroCopyBufferTrait {
def channel: FileChannel = AllocatorContext.this.channel
def file = id
@@ -374,7 +345,7 @@ class FileDirectBufferAllocator(val dire
}
}
- def alloc(size: Int): DirectBuffer = current_context { ctx=>
+ def alloc(size: Int): ZeroCopyBuffer = current_context { ctx=>
val allocation = allocator.alloc(size)
assert(allocation!=null)
current_size = current_size.max(allocation.offset + allocation.size)
@@ -382,7 +353,7 @@ class FileDirectBufferAllocator(val dire
}
}
- def to_alloc_buffer(buffer:DirectBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
+ def to_alloc_buffer(buffer:ZeroCopyBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
val _current_allocator_context = new ThreadLocal[AllocatorContext]()
var contexts = Map[Int, AllocatorContext]()
@@ -414,7 +385,7 @@ class FileDirectBufferAllocator(val dire
contexts.get(file).get.sync
}
- def alloc(size: Int): DirectBuffer = current_context { ctx=>
+ def alloc(size: Int): ZeroCopyBuffer = current_context { ctx=>
ctx.alloc(size)
}
@@ -426,9 +397,9 @@ class FileDirectBufferAllocator(val dire
ctx.allocator.free(Allocation(offset, size))
}
- def view_buffer(file:Int, the_offset:Long, the_size:Int):DirectBuffer = {
+ def view_buffer(file:Int, the_offset:Long, the_size:Int):ZeroCopyBuffer = {
val the_channel = contexts.get(file).get.channel
- new BaseRetained with FileDirectBufferTrait {
+ new BaseRetained with FileZeroCopyBufferTrait {
def offset: Long = the_offset
def size: Int = the_size
val channel: FileChannel = the_channel
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Fri Jan 7 14:12:59 2011
@@ -28,7 +28,7 @@ class MessageRecord {
var protocol: AsciiBuffer = _
var size = 0
var buffer: Buffer = _
- var direct_buffer: DirectBuffer = _
+ var zero_copy_buffer: ZeroCopyBuffer = _
var expiration = 0L
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Fri Jan 7 14:12:59 2011
@@ -40,10 +40,10 @@ trait Store extends ServiceTrait {
def get_store_status(callback:(StoreStatusDTO)=>Unit)
/**
- * @returns true if the store implementation can handle accepting
- * MessageRecords with DirectBuffers in them.
+ * @returns a ZeroCopyBufferAllocator if the store supports protocols
+ * using zero copy buffers when transfering messages.
*/
- def direct_buffer_allocator():DirectBufferAllocator = null
+ def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
/**
* Creates a store uow which is used to perform persistent
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala (from r1056135, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala&r1=1056135&r2=1056328&rev=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala Fri Jan 7 14:12:59 2011
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License") you may not use this file except in compliance with
+ * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -16,19 +16,42 @@
*/
package org.apache.activemq.apollo.broker.store
-import org.fusesource.hawtbuf.AsciiBuffer
-import org.fusesource.hawtbuf.Buffer
+import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
+import java.nio.ByteBuffer
+import java.io._
+import org.fusesource.hawtdispatch.Retained
/**
+ * <p>Allocates ZeroCopyBuffer objects</p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class MessageRecord {
+trait ZeroCopyBufferAllocator {
+ def alloc(size:Int):ZeroCopyBuffer
+}
+
+/**
+ * <p>
+ * A ZeroCopyBuffer is a reference counted buffer on
+ * temp storage.
+ *
+ * ON the
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ZeroCopyBuffer extends Retained {
+
+ def size:Int
+
+ def remaining(from_position: Int): Int
+
+ def read(target: OutputStream):Unit
+
+ def read(src: Int, target: WritableByteChannel): Int
+
+ def write(src:ReadableByteChannel, target:Int): Int
- var key = -1L
- var protocol: AsciiBuffer = _
- var size = 0
- var buffer: Buffer = _
- var direct_buffer: DirectBuffer = _
- var expiration = 0L
+ def write(src:ByteBuffer, target:Int):Int
}
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Jan 7 14:12:59 2011
@@ -154,7 +154,7 @@ class JDBM2Client(store: JDBM2Store) {
var last_message_key = 0L
var last_queue_key = 0L
- var direct_buffer_allocator: FileDirectBufferAllocator = _
+ var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
def zero_copy_dir = {
import FileSupport._
@@ -167,8 +167,8 @@ class JDBM2Client(store: JDBM2Store) {
config.directory.mkdirs
if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
- direct_buffer_allocator = new FileDirectBufferAllocator(zero_copy_dir)
- direct_buffer_allocator.start
+ zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
+ zero_copy_buffer_allocator.start
}
recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
@@ -209,9 +209,9 @@ class JDBM2Client(store: JDBM2Store) {
last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
- if( direct_buffer_allocator!=null ) {
+ if( zero_copy_buffer_allocator!=null ) {
lobs_db.cursor { (_,v)=>
- direct_buffer_allocator.alloc_at(v._1, v._2, v._3)
+ zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
true
}
}
@@ -222,9 +222,9 @@ class JDBM2Client(store: JDBM2Store) {
def stop() = {
recman.close
recman = null;
- if( direct_buffer_allocator!=null ) {
- direct_buffer_allocator.stop
- direct_buffer_allocator = null
+ if( zero_copy_buffer_allocator!=null ) {
+ zero_copy_buffer_allocator.stop
+ zero_copy_buffer_allocator = null
}
}
@@ -303,10 +303,10 @@ class JDBM2Client(store: JDBM2Store) {
gc.foreach { key=>
message_refs_db.remove(key)
messages_db.remove(key)
- if( direct_buffer_allocator!=null ){
+ if( zero_copy_buffer_allocator!=null ){
val location = lobs_db.find(key)
if( location!=null ) {
- direct_buffer_allocator.free(location._1, location._2, location._3)
+ zero_copy_buffer_allocator.free(location._1, location._2, location._3)
}
}
}
@@ -346,18 +346,18 @@ class JDBM2Client(store: JDBM2Store) {
def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
transaction {
- var needs_direct_buffer_sync = Set[Int]()
+ var zcp_files_to_sync = Set[Int]()
uows.foreach { uow =>
uow.actions.foreach { case (msg, action) =>
val message_record = action.messageRecord
if (message_record != null) {
- val pb = if( message_record.direct_buffer != null ) {
+ val pb = if( message_record.zero_copy_buffer != null ) {
val r = to_pb(action.messageRecord).copy
- val buffer = direct_buffer_allocator.to_alloc_buffer(message_record.direct_buffer)
+ val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
lobs_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
- needs_direct_buffer_sync += buffer.file
+ zcp_files_to_sync += buffer.file
r.setDirect(true)
r.freeze
} else {
@@ -383,8 +383,8 @@ class JDBM2Client(store: JDBM2Store) {
}
}
- if( direct_buffer_allocator!=null ) {
- needs_direct_buffer_sync.foreach(direct_buffer_allocator.sync(_))
+ if( zero_copy_buffer_allocator!=null ) {
+ zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
}
}
callback.run
@@ -450,7 +450,7 @@ class JDBM2Client(store: JDBM2Store) {
val rc = from_pb(pb)
if( pb.getDirect ) {
val location = lobs_db.find(message_key)
- rc.direct_buffer = direct_buffer_allocator.view_buffer(location._1, location._2, location._3)
+ rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(location._1, location._2, location._3)
}
rc
}
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Jan 7 14:12:59 2011
@@ -80,7 +80,7 @@ class JDBM2Store extends DelayingStoreSu
protected def get_next_msg_key = next_msg_key.getAndIncrement
- override def direct_buffer_allocator():DirectBufferAllocator = client.direct_buffer_allocator
+ override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
executor {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Jan 7 14:12:59 2011
@@ -30,7 +30,7 @@ import org.apache.activemq.apollo.transp
import _root_.org.fusesource.hawtbuf._
import Buffer._
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
object StompCodec extends Log {
val READ_BUFFFER_SIZE = 1024*64;
@@ -50,11 +50,11 @@ object StompCodec extends Log {
rc.size = frame.size
rc.expiration = message.expiration
- if( frame.content.isInstanceOf[DirectContent] ) {
- rc.direct_buffer = frame.content.asInstanceOf[DirectContent].direct_buffer
+ if( frame.content.isInstanceOf[ZeroCopyContent] ) {
+ rc.zero_copy_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
}
- def buffer_size = if (rc.direct_buffer!=null) { frame.size - (rc.direct_buffer.size - 1) } else { frame.size }
+ def buffer_size = if (rc.zero_copy_buffer!=null) { frame.size - (rc.zero_copy_buffer.size - 1) } else { frame.size }
val os = new ByteArrayOutputStream(buffer_size)
frame.action.writeTo(os)
@@ -87,7 +87,7 @@ object StompCodec extends Log {
os.write(NEWLINE)
}
os.write(NEWLINE)
- if ( rc.direct_buffer==null ) {
+ if ( rc.zero_copy_buffer==null ) {
frame.content.writeTo(os)
}
}
@@ -143,10 +143,10 @@ object StompCodec extends Log {
line = read_line
}
- if( message.direct_buffer==null ) {
+ if( message.zero_copy_buffer==null ) {
new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
} else {
- new StompFrameMessage(new StompFrame(action, headers.toList, DirectContent(message.direct_buffer)))
+ new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.zero_copy_buffer)))
}
}
@@ -157,7 +157,7 @@ class StompCodec extends ProtocolCodec w
import StompCodec._
override protected def log: Log = StompCodec
- var direct_buffer_allocator:DirectBufferAllocator = null
+ var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
@@ -178,10 +178,10 @@ class StompCodec extends ProtocolCodec w
var write_channel:WritableByteChannel = null
var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
- var next_write_direct:DirectBuffer = null
+ var next_write_direct:ZeroCopyBuffer = null
var write_buffer = ByteBuffer.allocate(0)
- var write_direct:DirectBuffer = null
+ var write_direct:ZeroCopyBuffer = null
var write_direct_pos = 0
def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size >> 2)
@@ -250,9 +250,9 @@ class StompCodec extends ProtocolCodec w
os.write(NEWLINE)
frame.content match {
- case x:DirectContent=>
+ case x:ZeroCopyContent=>
assert(next_write_direct==null)
- next_write_direct = x.direct_buffer
+ next_write_direct = x.zero_copy_buffer
case x:BufferContent=>
x.content.writeTo(os)
END_OF_FRAME_BUFFER.writeTo(os)
@@ -319,7 +319,7 @@ class StompCodec extends ProtocolCodec w
var read_end = 0
var read_start = 0
- var read_direct:DirectBuffer = null
+ var read_direct:ZeroCopyBuffer = null
var read_direct_pos = 0
var next_action:FrameReader = read_action
@@ -482,9 +482,9 @@ class StompCodec extends ProtocolCodec w
// lets try to keep the content of big message outside of the JVM's garbage collection
// to keep the number of GCs down when moving big messages.
def is_message = action == SEND || action == MESSAGE
- if( length > 1024 && direct_buffer_allocator!=null && is_message) {
+ if( length > 1024 && zero_copy_buffer_allocator!=null && is_message) {
- read_direct = direct_buffer_allocator.alloc(length)
+ read_direct = zero_copy_buffer_allocator.alloc(length)
val dup = buffer.duplicate
dup.position(read_start)
@@ -531,10 +531,10 @@ class StompCodec extends ProtocolCodec w
null
}
- def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> {
+ def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:ZeroCopyBuffer):FrameReader = (buffer)=> {
if( read_frame_terminator(buffer, contentLength) ) {
next_action = read_action
- new StompFrame(ascii(action), headers.toList, DirectContent(ma))
+ new StompFrame(ascii(action), headers.toList, ZeroCopyContent(ma))
} else {
null
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Fri Jan 7 14:12:59 2011
@@ -16,16 +16,12 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.apollo.filter.{Expression, Filterable}
import _root_.org.fusesource.hawtbuf._
import collection.mutable.ListBuffer
import java.lang.{String, Class}
import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.util._
-import org.fusesource.hawtdispatch.BaseRetained
-import java.io.{OutputStream, DataOutput}
-import org.apache.activemq.apollo.broker.store.DirectBuffer
+import java.io.OutputStream
+import org.apache.activemq.apollo.broker.store.ZeroCopyBuffer
/**
*
@@ -103,7 +99,7 @@ case class StompFrameMessage(frame:Stomp
} else {
null
}
- case x:DirectContent =>
+ case x:ZeroCopyContent =>
null
case NilContent =>
if( toType == classOf[String] ) {
@@ -203,22 +199,22 @@ case class BufferContent(content:Buffer)
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class DirectContent(direct_buffer:DirectBuffer) extends StompContent {
- def length = direct_buffer.size-1
+case class ZeroCopyContent(zero_copy_buffer:ZeroCopyBuffer) extends StompContent {
+ def length = zero_copy_buffer.size-1
def writeTo(os:OutputStream) = {
val buff = new Array[Byte](1024*4)
- var remaining = direct_buffer.size-1
+ var remaining = zero_copy_buffer.size-1
while( remaining> 0 ) {
val c = remaining.min(buff.length)
- direct_buffer.read(os)
+ zero_copy_buffer.read(os)
os.write(buff, 0, c)
remaining -= c
}
}
def buffer:Buffer = {
- val rc = new DataByteArrayOutputStream(direct_buffer.size-1)
+ val rc = new DataByteArrayOutputStream(zero_copy_buffer.size-1)
writeTo(rc)
rc.toBuffer
}
@@ -227,8 +223,8 @@ case class DirectContent(direct_buffer:D
buffer.utf8
}
- override def retain = direct_buffer.retain
- override def release = direct_buffer.release
+ override def retain = zero_copy_buffer.retain
+ override def release = zero_copy_buffer.release
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1056328&r1=1056327&r2=1056328&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Jan 7 14:12:59 2011
@@ -571,9 +571,9 @@ class StompProtocolHandler extends Proto
connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
- if( this.host.store!=null && this.host.store.direct_buffer_allocator!=null ) {
+ if( this.host.store!=null && this.host.store.zero_copy_buffer_allocator!=null ) {
val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.direct_buffer_allocator = this.host.store.direct_buffer_allocator
+ wf.zero_copy_buffer_allocator = this.host.store.zero_copy_buffer_allocator
}
}