You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:22:31 UTC
svn commit: r961216 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/resources/META-IN...
Author: chirino
Date: Wed Jul 7 04:22:31 2010
New Revision: 961216
URL: http://svn.apache.org/viewvc?rev=961216&view=rev
Log:
renamed MemoryPool to DirectBufferPool.
starting to hooking in support for in in the store interfaces.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools
- copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala
- copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala
- copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala
- copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala
- copied, changed from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:22:31 2010
@@ -169,7 +169,7 @@ class Delivery extends BaseRetained {
def createMessageRecord() = {
val sm = new MessageRecord
sm.protocol = message.protocol
- sm.value = ProtocolFactory.get(message.protocol).encode(message)
+ sm.buffer = ProtocolFactory.get(message.protocol).encode(message)
sm.size = size
sm
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:22:31 2010
@@ -1167,7 +1167,7 @@ class QueueEntry(val queue:Queue, val se
queue.loading_size -= size
val delivery = new Delivery()
- delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.value)
+ delivery.message = ProtocolFactory.get(messageRecord.protocol).decode(messageRecord.buffer)
delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:22:31 2010
@@ -17,11 +17,9 @@
package org.apache.activemq.apollo.broker;
import _root_.java.util.{ArrayList, HashMap}
-import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
import _root_.scala.collection.JavaConversions._
-import _root_.scala.reflect.BeanProperty
import path.PathFilter
import org.fusesource.hawtbuf.AsciiBuffer
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -31,10 +29,9 @@ import org.apache.activemq.broker.store.
import org.fusesource.hawtbuf.proto.WireFormat
import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
-import java.io.File
import java.util.concurrent.TimeUnit
import org.apache.activemq.apollo.util.LongCounter
-import org.apache.activemq.apollo.{MemoryPoolFactory, MemoryPool}
+import org.apache.activemq.apollo.{DirectBufferPoolFactory, DirectBufferPool}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -103,7 +100,7 @@ class VirtualHost(val broker: Broker, va
}
var store:Store = null
- var memory_pool:MemoryPool = null
+ var memory_pool:DirectBufferPool = null
var transactionManager:TransactionManagerX = new TransactionManagerX
var protocols = Map[AsciiBuffer, WireFormat]()
val queue_id_counter = new LongCounter
@@ -128,18 +125,22 @@ class VirtualHost(val broker: Broker, va
override protected def _start(onCompleted:Runnable):Unit = {
-// val memory_pool_config: String = null
- val memory_pool_config: String = "hawtdb:activemq.tmp"
+ val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
+ store = StoreFactory.create(config.store)
- if( MemoryPoolFactory.validate(memory_pool_config) ) {
- memory_pool = MemoryPoolFactory.create(memory_pool_config)
- if( memory_pool!=null ) {
- memory_pool.start
- }
+ // val memory_pool_config: String = null
+ var direct_buffer_pool_config: String = "hawtdb:activemq.tmp"
+
+ if( direct_buffer_pool_config!=null && (store!=null && !store.supportsDirectBuffers) ) {
+ warn("The direct buffer pool will not be used because the configured store does not support them.")
+ direct_buffer_pool_config = null
+ }
+
+ if( direct_buffer_pool_config!=null ) {
+ memory_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config)
+ memory_pool.start
}
- val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
- store = StoreFactory.create(config.store)
if( store!=null ) {
store.configure(config.store, this)
val storeStartupDone = tracker.task("store startup")
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:22:31 2010
@@ -55,24 +55,22 @@ class CassandraClient() {
}
}
- implicit def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
+ def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
import PBMessageRecord._
val pb = PBMessageRecord.FACTORY.parseUnframed(v)
val rc = new MessageRecord
rc.protocol = pb.getProtocol
rc.size = pb.getSize
- rc.value = pb.getValue
- rc.directKey = pb.getStream
+ rc.buffer = pb.getValue
rc.expiration = pb.getExpiration
rc
}
- implicit def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
+ def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
val pb = new PBMessageRecord.Bean
pb.setProtocol(v.protocol)
pb.setSize(v.size)
- pb.setValue(v.value)
- pb.setStream(v.directKey)
+ pb.setValue(v.buffer)
pb.setExpiration(v.expiration)
pb.freeze.toUnframedByteArray
}
@@ -172,7 +170,7 @@ class CassandraClient() {
case (msg, action) =>
var rc =
if (action.store != null) {
- operations ::= Insert( schema.message_data \ (msg, action.store) )
+ operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.store) ) )
}
action.enqueues.foreach {
queueEntry =>
@@ -197,7 +195,7 @@ class CassandraClient() {
session =>
session.get(schema.message_data \ id) match {
case Some(x) =>
- val rc: MessageRecord = x.value
+ val rc: MessageRecord = decodeMessageRecord(x.value)
rc.key = id
Some(rc)
case None =>
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pools&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPool.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPool.scala Wed Jul 7 04:22:31 2010
@@ -18,7 +18,7 @@ package org.apache.activemq.broker.store
import org.fusesource.hawtdispatch.BaseRetained
import org.fusesource.hawtdb.api.Paged.SliceType
-import org.apache.activemq.apollo.{MemoryAllocation, MemoryPool}
+import org.apache.activemq.apollo.{DirectBuffer, DirectBufferPool}
import java.nio.ByteBuffer
import org.fusesource.hawtdb.api.PageFileFactory
import java.io.File
@@ -29,7 +29,7 @@ import java.io.File
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBMemoryPool(val file:File) extends MemoryPool {
+class HawtDBDirectBufferPool(val file:File) extends DirectBufferPool {
private val pageFilefactory = new PageFileFactory()
private def pageFile = pageFilefactory.getPageFile
@@ -56,10 +56,7 @@ class HawtDBMemoryPool(val file:File) ex
}
}
- class HawtMemoryAllocation(page:Int, page_count:Int, alloc_size:Int, original:ByteBuffer, slice:ByteBuffer) extends BaseRetained with MemoryAllocation {
- def size = alloc_size
- def buffer = slice
-
+ class HawtMemoryAllocation(val page:Int, val page_count:Int, val original:ByteBuffer, val buffer:ByteBuffer) extends BaseRetained with DirectBuffer {
override def dispose = {
pageFile.unslice(original)
pageFile.allocator.free(page, page_count)
@@ -70,10 +67,8 @@ class HawtDBMemoryPool(val file:File) ex
val page_count: Int = pageFile.pages(alloc_size)
val page = pageFile.allocator.alloc(page_count)
val original = pageFile.slice(SliceType.READ_WRITE, page, page_count)
-
original.limit(original.position+alloc_size)
-
val slice = original.slice
- new HawtMemoryAllocation(page, page_count, alloc_size, original, slice)
+ new HawtMemoryAllocation(page, page_count, original, slice)
}
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBDirectBufferPoolSPI.scala Wed Jul 7 04:22:31 2010
@@ -18,27 +18,27 @@ package org.apache.activemq.broker.store
import java.io.File
import java.lang.String
-import org.apache.activemq.apollo.MemoryPoolFactory
+import org.apache.activemq.apollo.DirectBufferPoolFactory
/**
* <p>
- * Hook to use a HawtDBMemoryPool for the memory pool implementation.
+ * Hook to use a HawtDBDirectBufferPool for the memory pool implementation.
* </p>
* <p>
* This class is discovered using the following resource file:
- * <code>META-INF/services/org.apache.activemq.apollo/memory-pools</code>
+ * <code>META-INF/services/org.apache.activemq.apollo/direct-buffer-pools</code>
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBMemoryPoolSPI extends MemoryPoolFactory.SPI {
+class HawtDBDirectBufferPoolSPI extends DirectBufferPoolFactory.SPI {
val prefix: String = "hawtdb:"
def create(config: String) = {
if( config.startsWith(prefix) ) {
val file = new File(config.substring(prefix.length))
- new HawtDBMemoryPool(file)
+ new HawtDBDirectBufferPool(file)
} else {
null
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:22:31 2010
@@ -108,7 +108,7 @@ object Helpers {
rc.key = pb.getMessageKey
rc.protocol = pb.getProtocol
rc.size = pb.getSize
- rc.value = pb.getValue
+ rc.buffer = pb.getValue
rc.expiration = pb.getExpiration
rc
}
@@ -118,7 +118,7 @@ object Helpers {
pb.setMessageKey(v.key)
pb.setProtocol(v.protocol)
pb.setSize(v.size)
- pb.setValue(v.value)
+ pb.setValue(v.buffer)
pb.setExpiration(v.expiration)
pb
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 04:22:31 2010
@@ -23,7 +23,7 @@ import collection.mutable.ListBuffer
import java.lang.{String, Class}
import java.io.DataOutput
import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.MemoryAllocation
+import org.apache.activemq.apollo.DirectBuffer
import org.fusesource.hawtdispatch.BaseRetained
/**
@@ -192,7 +192,7 @@ case class BufferStompContent(content:Bu
def utf8:UTF8Buffer = content.utf8
}
-case class DirectStompContent(direct:MemoryAllocation) extends StompContent {
+case class DirectStompContent(direct:DirectBuffer) extends StompContent {
def length = direct.size-1
def writeTo(os:DataOutput) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:22:31 2010
@@ -31,7 +31,7 @@ import _root_.scala.collection.JavaConve
import StompFrameConstants._
import java.io.{EOFException, DataOutput, DataInput, IOException}
import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
-import org.apache.activemq.apollo.{MemoryAllocation, MemoryPool}
+import org.apache.activemq.apollo.{DirectBuffer, DirectBufferPool}
/**
* Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
@@ -70,7 +70,7 @@ class StompWireFormat extends WireFormat
import StompWireFormat._
override protected def log: Log = StompWireFormat
- var memory_pool:MemoryPool = null
+ var memory_pool:DirectBufferPool = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
@@ -507,7 +507,7 @@ class StompWireFormat extends WireFormat
}
- def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> {
+ def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:DirectBuffer):FrameReader = (buffer)=> {
if( read_content_direct(ma) ) {
next_action = read_action
new StompFrame(ascii(action), headers.toList, DirectStompContent(ma))
@@ -516,7 +516,7 @@ class StompWireFormat extends WireFormat
}
}
- def read_content_direct(ma:MemoryAllocation) = {
+ def read_content_direct(ma:DirectBuffer) = {
val read_limit = ma.buffer.position
if( read_limit < ma.size ) {
read_end = read_limit
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul 7 04:22:31 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.apollo.store;
+import org.apache.activemq.apollo.DirectBuffer;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
@@ -27,8 +28,8 @@ public class MessageRecord {
public long key = -1;
public AsciiBuffer protocol;
public int size;
- public Buffer value;
- public long directKey = -1;
+ public Buffer buffer;
+ public DirectBuffer direct_buffer = null;
public long expiration = 0;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:22:31 2010
@@ -34,6 +34,12 @@ trait Store extends ServiceTrait {
def storeStatusDTO(callback:(StoreStatusDTO)=>Unit)
/**
+ * @returns true if the store implementation can handle accepting
+ * MessageRecords with DirectBuffers in them.
+ */
+ def supportsDirectBuffers() = false
+
+ /**
* Creates a store uow which is used to perform persistent
* operations as unit of work.
*/
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreBenchmarkSupport.scala Wed Jul 7 04:22:31 2010
@@ -98,8 +98,8 @@ abstract class StoreBenchmarkSupport ext
def addMessage(batch:StoreUOW, content:String):Long = {
var message = new MessageRecord
message.protocol = ascii("test-protocol")
- message.value = ascii(content).buffer
- message.size = message.value.length
+ message.buffer = ascii(content).buffer
+ message.size = message.buffer.length
batch.store(message)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala?rev=961216&r1=961215&r2=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/apollo/store/StoreFunSuiteSupport.scala Wed Jul 7 04:22:31 2010
@@ -97,8 +97,8 @@ abstract class StoreFunSuiteSupport exte
def addMessage(batch:StoreUOW, content:String):Long = {
var message = new MessageRecord
message.protocol = ascii("test-protocol")
- message.value = ascii(content).buffer
- message.size = message.value.length
+ message.buffer = ascii(content).buffer
+ message.size = message.buffer.length
batch.store(message)
}
@@ -138,7 +138,7 @@ abstract class StoreFunSuiteSupport exte
val rc:Option[MessageRecord] = CB( cb=> store.loadMessage(msgKeys.head)(cb) )
expect(ascii("message 1").buffer) {
- rc.get.value
+ rc.get.buffer
}
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPool.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPool.scala Wed Jul 7 04:22:31 2010
@@ -22,12 +22,13 @@ import org.apache.activemq.Service
/**
* <p>
+ * A DirectBuffer holds a reference counted Direct ByteBuffer
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait MemoryAllocation extends Retained {
- def size:Int
+trait DirectBuffer extends Retained {
+ def size = buffer.capacity
def buffer:ByteBuffer
}
@@ -37,6 +38,6 @@ trait MemoryAllocation extends Retained
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait MemoryPool extends Service {
- def alloc(size:Int):MemoryAllocation
+trait DirectBufferPool extends Service {
+ def alloc(size:Int):DirectBuffer
}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala (from r961215, activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala&r1=961215&r2=961216&rev=961216&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/MemoryPoolFactory.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/apollo/DirectBufferPoolFactory.scala Wed Jul 7 04:22:31 2010
@@ -24,37 +24,37 @@ import org.apache.activemq.apollo.util.C
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object MemoryPoolFactory {
+object DirectBufferPoolFactory {
- val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/memory-pools")
- var memory_pool_spis = List[SPI]()
+ val finder = ClassFinder[SPI]("META-INF/services/org.apache.activemq.apollo/direct-buffer-pools")
+ var direct_buffer_pools_spis = List[SPI]()
trait SPI {
- def create(config:String):MemoryPool
+ def create(config:String):DirectBufferPool
def validate(config: String):Boolean
}
finder.find.foreach{ clazz =>
try {
val SPI = clazz.newInstance.asInstanceOf[SPI]
- memory_pool_spis ::= SPI
+ direct_buffer_pools_spis ::= SPI
} catch {
case e:Throwable =>
e.printStackTrace
}
}
- def create(config:String):MemoryPool = {
+ def create(config:String):DirectBufferPool = {
if( config == null ) {
return null
}
- memory_pool_spis.foreach { spi=>
+ direct_buffer_pools_spis.foreach { spi=>
val rc = spi.create(config)
if( rc!=null ) {
return rc
}
}
- throw new IllegalArgumentException("Uknonwn memory pool type: "+config)
+ throw new IllegalArgumentException("Uknonwn direct buffer pool type: "+config)
}
@@ -62,7 +62,7 @@ object MemoryPoolFactory {
if( config == null ) {
return true
} else {
- memory_pool_spis.foreach { spi=>
+ direct_buffer_pools_spis.foreach { spi=>
if( spi.validate(config) ) {
return true
}