You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/07 01:17:18 UTC
svn commit: r1056135 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/proto/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/
apollo-jdbm2/src/main/scala/o...
Author: chirino
Date: Fri Jan 7 00:17:17 2011
New Revision: 1056135
URL: http://svn.apache.org/viewvc?rev=1056135&view=rev
Log:
initial pass at supporting zero copy buffers between stomp and jdbm2 store.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Jan 7 00:17:17 2011
@@ -27,8 +27,8 @@ message MessagePB {
required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
required int32 size = 3;
optional bytes value = 4;
- optional int64 streamKey = 5;
- optional int64 expiration = 6;
+ optional bool direct = 5;
+ optional sint64 expiration = 6;
}
message QueuePB {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Fri Jan 7 00:17:17 2011
@@ -139,7 +139,7 @@ object Delivery extends Sizer[Delivery]
def size(value:Delivery):Int = value.size
}
-class Delivery extends BaseRetained {
+class Delivery {
/**
* Total size of the delivery. Used for resource allocation tracking
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Jan 7 00:17:17 2011
@@ -981,8 +981,6 @@ class QueueEntry(val queue:Queue, val se
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
- delivery.message.release
-
state = new Swapped(delivery.storeKey, size)
if( can_combine_with_prev ) {
getPrevious.as_swapped_range.combineNext
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala Fri Jan 7 00:17:17 2011
@@ -21,6 +21,8 @@ import java.nio.channels.{FileChannel, W
import java.nio.ByteBuffer
import java.io._
import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch.internal.DispatcherConfig
+
/**
* <p>
* </p>
@@ -120,7 +122,7 @@ trait FileDirectBufferTrait extends Dire
}
}
-case class Allocation(size:Long, offset:Long) extends Ordered[Allocation] {
+case class Allocation(offset:Long, size:Long) extends Ordered[Allocation] {
var _free_func: (Allocation)=>Unit = _
@@ -188,29 +190,64 @@ class TreeAllocator(range:Allocation) ex
}
val allocation = spot_entry.getKey
- free_by_size.remove(allocation)
+ free_by_size.removeEntry(spot_entry)
+ free_by_offset.remove(allocation.offset)
// might be the perfect size
- if( allocation.size == request ) {
- allocation._free_func = free
+ val rc = if( allocation.size == request ) {
allocation
} else {
// split the allocation..
var (first, second) = allocation.split(request)
- free_by_offset.remove(first.offset)
- free_by_offset.put(second.offset, second)
-
// put the free part in the free map.
+ free_by_offset.put(second.offset, second)
free_by_size.put(second, null)
- first._free_func = free
first
}
+ rc._free_func = free
+ rc
}
- def free(allocation:Allocation):Unit = {
+ def alloc_at(req:Allocation):Boolean = {
+ var spot_entry = free_by_offset.floorEntry(req.offset)
+ if( spot_entry== null ) {
+ return false
+ }
+
+ var spot = spot_entry.getValue
+ if( spot.offset+spot.size < req.offset+req.size ) {
+ return false
+ }
+
+ free_by_offset.removeEntry(spot_entry)
+ free_by_size.remove(spot)
+
+ // only need to put back if it was not exactly what we need.
+ if( spot != req ) {
+
+ // deal with excess at the front
+ if( spot.offset != req.offset ) {
+ val (prev, next) = spot.split(req.offset - spot.offset)
+ free_by_offset.put(prev.offset, prev)
+ free_by_size.put(prev, null)
+ spot = next
+ }
+
+ // deal with excess at the rear
+ if( spot.size != req.size ) {
+ val (prev, next) = spot.split(req.size)
+ free_by_offset.put(next.offset, next)
+ free_by_size.put(next, null)
+ }
+ }
+ req._free_func = free
+ true
+ }
+
+ def free(allocation:Allocation):Unit = {
var prev_e = free_by_offset.floorEntry(allocation.offset)
var next_e = if( prev_e!=null ) {
@@ -296,13 +333,34 @@ class ActiveAllocator(val range:Allocati
class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
// we use thread local allocators to
- class AllocatorContext(val queue:DispatchQueue) {
+ class AllocatorContext(val id:Int) {
val allocator = new TreeAllocator(Allocation(0, Long.MaxValue))
- var channel:FileChannel = new RandomAccessFile(queue.getLabel, "rw").getChannel
+ var channel:FileChannel = new RandomAccessFile(new File(directory, ""+id+".data"), "rw").getChannel
+ var queue:DispatchQueue = _
+
+ var last_sync_size = channel.size
+ @volatile
+ var current_size = last_sync_size
+
+ def size_changed = this.synchronized {
+ val t = current_size
+ if( t != last_sync_size ) {
+ last_sync_size = t
+ true
+ } else {
+ false
+ }
+ }
+
+ def sync = {
+ channel.force(size_changed)
+ }
class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait {
def channel: FileChannel = AllocatorContext.this.channel
+
+ def file = id
def offset: Long = allocation.offset
def size: Int = allocation.size.toInt
@@ -316,24 +374,74 @@ class FileDirectBufferAllocator(val dire
}
}
- def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+ def alloc(size: Int): DirectBuffer = current_context { ctx=>
val allocation = allocator.alloc(size)
assert(allocation!=null)
+ current_size = current_size.max(allocation.offset + allocation.size)
new AllocationBuffer(allocation)
}
}
+ def to_alloc_buffer(buffer:DirectBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
+
val _current_allocator_context = new ThreadLocal[AllocatorContext]()
+ var contexts = Map[Int, AllocatorContext]()
- protected def start() = {
+ def start() = {
directory.mkdirs
+ val config = new DispatcherConfig()
+ for( i <- 0 until config.getThreads ) {
+ val ctx = new AllocatorContext(i)
+ contexts += i->ctx
+ getThreadQueue(i) {
+ ctx.queue = getCurrentThreadQueue
+ _current_allocator_context.set(ctx)
+ }
+ }
}
- def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+ def stop() = {
+ val config = new DispatcherConfig()
+ for( i <- 0 until config.getThreads ) {
+ contexts = Map()
+ getThreadQueue(i) {
+ _current_allocator_context.remove
+ }
+ }
+ }
+
+ def sync(file: Int) = {
+ contexts.get(file).get.sync
+ }
+
+ def alloc(size: Int): DirectBuffer = current_context { ctx=>
ctx.alloc(size)
}
- def with_allocator_context[T](func: (AllocatorContext)=>T):T = {
+ def alloc_at(file:Int, offset:Long, size:Int):Unit = context(file) { ctx=>
+ ctx.allocator.alloc_at(Allocation(offset, size))
+ }
+
+ def free(file:Int, offset:Long, size:Int):Unit = context(file) { ctx=>
+ ctx.allocator.free(Allocation(offset, size))
+ }
+
+ def view_buffer(file:Int, the_offset:Long, the_size:Int):DirectBuffer = {
+ val the_channel = contexts.get(file).get.channel
+ new BaseRetained with FileDirectBufferTrait {
+ def offset: Long = the_offset
+ def size: Int = the_size
+ val channel: FileChannel = the_channel
+ }
+ }
+
+ def context(i:Int)(func: (AllocatorContext)=>Unit):Unit= {
+ getThreadQueue(i) {
+ func(current_allocator_context)
+ }
+ }
+
+ def current_context[T](func: (AllocatorContext)=>T):T = {
if( getCurrentThreadQueue == null ) {
getGlobalQueue().future(func(current_allocator_context))()
} else {
@@ -341,14 +449,6 @@ class FileDirectBufferAllocator(val dire
}
}
- def current_allocator_context:AllocatorContext = {
- val thread_queue = getCurrentThreadQueue
- assert(thread_queue != null)
- var rc = _current_allocator_context.get
- if( rc==null ) {
- rc = new AllocatorContext(thread_queue)
- _current_allocator_context.set(rc)
- }
- rc
- }
+ def current_allocator_context:AllocatorContext = _current_allocator_context.get
+
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Jan 7 00:17:17 2011
@@ -30,13 +30,12 @@ import java.util.Comparator
import jdbm.helper._
import PBSupport._
import org.fusesource.hawtbuf.proto.PBMessageFactory
-import java.io.{EOFException, InputStream, OutputStream, Serializable}
-
+import java.io._
object JDBM2Client extends Log {
- object MessageRecordSerializer extends Serializer[MessageRecord] {
- def serialize(out: SerializerOutput, v: MessageRecord) = encode_message_record(out, v)
- def deserialize(in: SerializerInput) = decode_message_record(in)
+ object MessageRecordSerializer extends Serializer[MessagePB.Buffer] {
+ def serialize(out: SerializerOutput, v: MessagePB.Buffer) = v.writeUnframed(out)
+ def deserialize(in: SerializerInput) = MessagePB.FACTORY.parseUnframed(in).freeze
}
object QueueRecordSerializer extends Serializer[QueueRecord] {
@@ -49,6 +48,18 @@ object JDBM2Client extends Log {
def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
}
+ object LobValueSerializer extends Serializer[(Int, Long, Int)] {
+ def serialize(out: SerializerOutput, v: (Int,Long, Int)) = {
+ out.writePackedInt(v._1)
+ out.writePackedLong(v._2)
+ out.writePackedInt(v._3)
+ }
+
+ def deserialize(in: SerializerInput) = {
+ (in.readPackedInt, in.readPackedLong, in.readPackedInt)
+ }
+ }
+
object QueueEntryKeySerializer extends Serializer[(Long,Long)] {
def serialize(out: SerializerOutput, v: (Long,Long)) = {
out.writePackedLong(v._1)
@@ -136,17 +147,30 @@ class JDBM2Client(store: JDBM2Store) {
var queues_db:HTree[Long, QueueRecord] = _
var entries_db:BTree[(Long,Long), QueueEntryRecord] = _
- var messages_db:HTree[Long, MessageRecord] = _
+ var messages_db:HTree[Long, MessagePB.Buffer] = _
+ var lobs_db:HTree[Long, (Int, Long, Int)] = _
var message_refs_db:HTree[Long, java.lang.Integer] = _
var last_message_key = 0L
var last_queue_key = 0L
+ var direct_buffer_allocator: FileDirectBufferAllocator = _
+
+ def zero_copy_dir = {
+ import FileSupport._
+ config.directory / "zerocp"
+ }
+
def start() = {
+ import FileSupport._
config.directory.mkdirs
- import FileSupport._
+ if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
+ direct_buffer_allocator = new FileDirectBufferAllocator(zero_copy_dir)
+ direct_buffer_allocator.start
+ }
+
recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
def init_btree[K, V](name: String, key_comparator:Comparator[K]=ComparableComparator.INSTANCE.asInstanceOf[Comparator[K]], key_serializer:Serializer[K]=null, value_serializer:Serializer[V]=null) = {
@@ -175,15 +199,22 @@ class JDBM2Client(store: JDBM2Store) {
rc
}
-
transaction {
messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
+ lobs_db = init_htree("lobs", value_serializer = LobValueSerializer)
message_refs_db = init_htree("message_refs")
queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
entries_db = init_btree("enttries", new QueueEntryKeyComparator, QueueEntryKeySerializer, QueueEntryRecordSerializer)
last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
+
+ if( direct_buffer_allocator!=null ) {
+ lobs_db.cursor { (_,v)=>
+ direct_buffer_allocator.alloc_at(v._1, v._2, v._3)
+ true
+ }
+ }
}
}
@@ -191,6 +222,10 @@ class JDBM2Client(store: JDBM2Store) {
def stop() = {
recman.close
recman = null;
+ if( direct_buffer_allocator!=null ) {
+ direct_buffer_allocator.stop
+ direct_buffer_allocator = null
+ }
}
def transaction[T](func: => T): T = {
@@ -214,6 +249,7 @@ class JDBM2Client(store: JDBM2Store) {
if( config.directory.isDirectory ) {
config.directory.listFiles.filter(_.getName.startsWith("jdbm2.")).foreach(_.delete)
}
+ zero_copy_dir.delete
}
if( recman!=null ) {
stop
@@ -267,6 +303,12 @@ class JDBM2Client(store: JDBM2Store) {
gc.foreach { key=>
message_refs_db.remove(key)
messages_db.remove(key)
+ if( direct_buffer_allocator!=null ){
+ val location = lobs_db.find(key)
+ if( location!=null ) {
+ direct_buffer_allocator.free(location._1, location._2, location._3)
+ }
+ }
}
}
recman.defrag
@@ -304,11 +346,25 @@ class JDBM2Client(store: JDBM2Store) {
def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
transaction {
+ var needs_direct_buffer_sync = Set[Int]()
uows.foreach { uow =>
uow.actions.foreach { case (msg, action) =>
- if (action.messageRecord != null) {
- messages_db.put(action.messageRecord.key, action.messageRecord)
+ val message_record = action.messageRecord
+ if (message_record != null) {
+
+ val pb = if( message_record.direct_buffer != null ) {
+ val r = to_pb(action.messageRecord).copy
+ val buffer = direct_buffer_allocator.to_alloc_buffer(message_record.direct_buffer)
+ lobs_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
+ needs_direct_buffer_sync += buffer.file
+ r.setDirect(true)
+ r.freeze
+ } else {
+ to_pb(action.messageRecord)
+ }
+
+ messages_db.put(action.messageRecord.key, pb)
if( action.messageRecord.key > last_message_key ) {
last_message_key = action.messageRecord.key
recman.setNamedObject("last_message_key", last_message_key)
@@ -327,6 +383,9 @@ class JDBM2Client(store: JDBM2Store) {
}
}
+ if( direct_buffer_allocator!=null ) {
+ needs_direct_buffer_sync.foreach(direct_buffer_allocator.sync(_))
+ }
}
callback.run
}
@@ -385,23 +444,19 @@ class JDBM2Client(store: JDBM2Store) {
var metric_load_from_index = metric_load_from_index_counter(false)
def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
- val records = requests.flatMap { case (message_key, callback)=>
+ requests.foreach { case (message_key, callback)=>
val record = metric_load_from_index_counter.time {
- Option(messages_db.find(message_key))
- }
- record match {
- case None =>
- debug("Message not indexed: %s", message_key)
- callback(None)
- None
- case Some(x) => Some((record, callback))
+ Option(messages_db.find(message_key)).map{ pb=>
+ val rc = from_pb(pb)
+ if( pb.getDirect ) {
+ val location = lobs_db.find(message_key)
+ rc.direct_buffer = direct_buffer_allocator.view_buffer(location._1, location._2, location._3)
+ }
+ rc
+ }
}
+ callback(record)
}
-
- records.foreach { case (record, callback)=>
- callback( record )
- }
-
}
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Jan 7 00:17:17 2011
@@ -80,6 +80,8 @@ class JDBM2Store extends DelayingStoreSu
protected def get_next_msg_key = next_msg_key.getAndIncrement
+ override def direct_buffer_allocator():DirectBufferAllocator = client.direct_buffer_allocator
+
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
executor {
client.store(uows, ^{
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java Fri Jan 7 00:17:17 2011
@@ -34,7 +34,9 @@ public class JDBM2StoreDTO extends Store
@XmlAttribute
public File directory;
- @XmlAttribute
+ @XmlAttribute(name="compact_interval")
public Integer compact_interval;
+ @XmlAttribute(name="zero_copy")
+ public Boolean zero_copy;
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056135&r1=1056134&r2=1056135&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Jan 7 00:17:17 2011
@@ -348,6 +348,8 @@ class StompCodec extends ProtocolCodec w
val count = read_direct.write(read_channel, read_direct_pos)
if (count == -1) {
throw new EOFException("Peer disconnected")
+ } else if (count == 0) {
+ return null
}
read_direct_pos += count
} else if (read_end == read_buffer.position() ) {