You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:17:36 UTC
svn commit: r961197 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/
activemq-hawtdb/src/main/proto/ a...
Author: chirino
Date: Wed Jul 7 04:17:35 2010
New Revision: 961197
URL: http://svn.apache.org/viewvc?rev=961197&view=rev
Log:
Adding a new DirectRecordStore to allow the protocols to stream large message out of band (from the point of view of the JVM gc).
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/MultiProtocol.scala Wed Jul 7 04:17:35 2010
@@ -50,7 +50,7 @@ class MultiProtocolHandler extends Proto
override def onTransportCommand(command:Any) = {
if (!command.isInstanceOf[WireFormat]) {
- throw new ProtocolException("First command should be a WireFormat");
+ throw new ProtocolException("Expected WireFormat");
}
var wireformat:WireFormat = command.asInstanceOf[WireFormat];
@@ -67,6 +67,8 @@ class MultiProtocolHandler extends Proto
// replace the current handler with the new one.
connection.protocol = protocol
connection.protocolHandler = protocolHandler
+ connection.transport.setWireformat(wireformat)
+
connection.transport.suspendRead
protocolHandler.onTransportConnected
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-cassandra/src/main/scala/org/apache/activemq/broker/store/cassandra/CassandraClient.scala Wed Jul 7 04:17:35 2010
@@ -62,7 +62,7 @@ class CassandraClient() {
rc.protocol = pb.getProtocol
rc.size = pb.getSize
rc.value = pb.getValue
- rc.stream = pb.getStream
+ rc.directKey = pb.getStream
rc.expiration = pb.getExpiration
rc
}
@@ -72,7 +72,7 @@ class CassandraClient() {
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.value)
- pb.setStream(v.stream)
+ pb.setStream(v.directKey)
pb.setExpiration(v.expiration)
pb.freeze.toUnframedByteArray
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/proto/data.proto Wed Jul 7 04:17:35 2010
@@ -34,10 +34,8 @@ enum Type {
PUT_MAP_ENTRY = 32;
REMOVE_MAP_ENTRY = 33;
- OPEN_STREAM = 40;
- WRITE_STREAM = 41;
- CLOSE_STREAM = 42;
- REMOVE_STREAM = 43;
+ ADD_DIRECT = 40;
+ REMOVE_DIRECT = 41;
ADD_SUBSCRIPTION = 50;
REMOVE_SUBSCRIPTION = 51;
@@ -134,20 +132,16 @@ message RemoveMapEntry {
}
///////////////////////////////////////////////////////////////
-// Stream related operations.
+// Direct buffer related operations.
///////////////////////////////////////////////////////////////
-message OpenStream {
- required int64 streamKey=1;
+message AddDirect {
+ required int64 directKey=1;
+ required int32 size=2;
+ required int32 page=3;
}
-message WriteStream {
- required int64 streamKey=1;
- optional bytes data = 2;
-}
-message CloseStream {
- required int64 streamKey=1;
-}
-message RemoveStream {
- required int64 streamKey=1;
+
+message RemoveDirect {
+ required int64 directKey=1;
}
@@ -167,7 +161,8 @@ message DatabaseRootRecord {
optional fixed32 queueIndexPage=53;
optional fixed32 subscriptionIndexPage=54;
optional fixed32 mapIndexPage=55;
-
+ optional fixed32 directIndexPage=56;
+
}
message QueueRootRecord {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBClient.scala Wed Jul 7 04:17:35 2010
@@ -23,15 +23,9 @@ import model.{AddQueue, AddQueueEntry, A
import org.apache.activemq.apollo.dto.HawtDBStoreDTO
import java.io.File
import java.io.IOException
-import java.util.concurrent.TimeUnit
-import org.fusesource.hawtbuf.proto.MessageBuffer
-import org.fusesource.hawtbuf.proto.PBMessage
import org.apache.activemq.util.LockFile
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import org.fusesource.hawtdb.internal.journal.{JournalListener, Journal, Location}
-import org.fusesource.hawtdispatch.TaskTracker
-
-import org.fusesource.hawtbuf.AsciiBuffer._
import org.apache.activemq.broker.store.hawtdb.model.Type._
import org.apache.activemq.broker.store.hawtdb.model._
import org.fusesource.hawtbuf._
@@ -44,6 +38,10 @@ import org.fusesource.hawtdb.api._
import org.apache.activemq.apollo.broker.{DispatchLogging, Log, Logging, BaseService}
import org.apache.activemq.apollo.util.TimeCounter
import org.apache.activemq.apollo.store._
+import org.fusesource.hawtdb.api.Paged.SliceType
+import java.util.concurrent.TimeUnit
+
+
object HawtDBClient extends Log {
val BEGIN = -1
@@ -69,7 +67,8 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def dispatchQueue = hawtDBStore.dispatchQueue
- private val pageFileFactory = new TxPageFileFactory()
+ private val directFileFactory = new PageFileFactory()
+ private val indexFileFactory = new TxPageFileFactory()
private var journal: Journal = null
private var lockFile: LockFile = null
@@ -107,7 +106,9 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private def failIfDatabaseIsLocked = config.failIfLocked
- private def pageFile = pageFileFactory.getTxPageFile()
+ private def directFile = directFileFactory.getPageFile
+
+ private def indexFile = indexFileFactory.getTxPageFile()
/////////////////////////////////////////////////////////////////////
@@ -176,14 +177,19 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
journal.start
- pageFileFactory.setFile(new File(directory, "db"))
- pageFileFactory.setDrainOnClose(false)
- pageFileFactory.setSync(true)
- pageFileFactory.setUseWorkerThread(true)
- pageFileFactory.setPageSize(config.indexPageSize)
- pageFileFactory.setCacheSize(config.indexCacheSize);
+ directFileFactory.setFile(new File(directory, "direct"));
+ directFileFactory.setHeaderSize(0);
+ directFileFactory.setPageSize(1024)
+ directFileFactory.open
+
+ indexFileFactory.setFile(new File(directory, "db"))
+ indexFileFactory.setDrainOnClose(false)
+ indexFileFactory.setSync(true)
+ indexFileFactory.setUseWorkerThread(true)
+ indexFileFactory.setPageSize(config.indexPageSize)
+ indexFileFactory.setCacheSize(config.indexCacheSize);
- pageFileFactory.open()
+ indexFileFactory.open
val initialized = withTx { tx =>
if (!tx.allocator().isAllocated(0)) {
@@ -198,6 +204,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
rootBean.setDataFileRefIndexPage(alloc(DATA_FILE_REF_INDEX_FACTORY))
rootBean.setMessageRefsIndexPage(alloc(MESSAGE_REFS_INDEX_FACTORY))
rootBean.setSubscriptionIndexPage(alloc(SUBSCRIPTIONS_INDEX_FACTORY))
+ rootBean.setDirectIndexPage(alloc(DIRECT_INDEX_FACTORY))
storedRootBuffer = rootBean.freeze
helper.storeRootBean
@@ -210,11 +217,25 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
if( initialized ) {
- pageFile.flush()
+ indexFile.flush()
}
recover(onComplete)
+ // update the allocated free list in the direct file
+ // by unfreeing the items contained in the direct index.
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import JavaConversions._
+ import helper._
+
+ directIndex.iterator.foreach { entry =>
+ val record = entry.getValue
+ val page_count: Int = directFile.pages(record.getSize)
+ directFile.allocator.unfree(record.getPage, page_count)
+ }
+ }
+
// Schedule periodic jobs.. they keep executing while schedule_version remains the same.
scheduleCleanup(schedule_version.get())
scheduleFlush(schedule_version.get())
@@ -224,10 +245,66 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def stop() = {
schedule_version.incrementAndGet
journal.close
- pageFileFactory.close
+ indexFileFactory.close
lockFile.unlock
}
+ val last_direct_key = new AtomicLong
+
+ def createDirectRecord(size: Int)(callback: (DirectRecord) => Unit) = {
+
+ val page_count: Int = directFile.pages(size)
+ val page = directFile.allocator.alloc(page_count)
+ val buffer = directFile.slice(SliceType.READ_WRITE, page, page_count)
+ val key = last_direct_key.incrementAndGet
+
+ val pb = new AddDirect.Bean
+ pb.setDirectKey(key)
+ pb.setSize(size)
+ pb.setPage(page)
+ _store(pb, null)
+
+ val record = new DirectRecord
+ record.key = key
+ record.size = size
+ record.buffer = buffer
+ callback(record)
+ }
+
+ def openDirectRecord(key: Long)(callback: (Option[DirectRecord]) => Unit) = {
+ val result = withTx { tx =>
+ val helper = new TxHelper(tx)
+ import helper._
+ val pb:AddDirect.Getter = directIndex.get(key)
+ if( pb!=null ) {
+ val page_count: Int = directFile.pages(pb.getSize)
+ val buffer = directFile.slice(SliceType.READ, pb.getPage, page_count)
+
+ val record = new DirectRecord
+ record.key = key
+ record.size = pb.getSize
+ record.buffer = buffer
+
+ Some(record)
+ } else {
+ None
+ }
+ }
+ callback(result)
+ }
+
+
+ def closeDirectRecord(record: DirectRecord) = {
+ directFile.unslice(record.buffer)
+ }
+
+ def removeDirectRecord(key: Long)(callback: (Boolean) => Unit) = {
+ val update = new RemoveDirect.Bean
+ update.setDirectKey(key)
+ _store(update, null)
+ }
+
+
def addQueue(record: QueueRecord, callback:Runnable) = {
val update = new AddQueue.Bean()
update.setKey(record.key)
@@ -290,7 +367,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def getQueueStatus(queueKey: Long): Option[QueueStatus] = {
withTx { tx =>
val helper = new TxHelper(tx)
- import JavaConversions._
import helper._
val queueRecord = queueIndex.get(queueKey)
@@ -319,8 +395,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
val helper = new TxHelper(tx)
import JavaConversions._
import helper._
- import Predicates._
-
val queueRecord = queueIndex.get(queueKey)
if (queueRecord != null) {
val entryIndex = queueEntryIndex(queueRecord)
@@ -383,7 +457,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
val locations = withTx { tx =>
val helper = new TxHelper(tx)
- import JavaConversions._
import helper._
requests.flatMap { case (messageKey, callback)=>
val location = metric_load_from_index.time {
@@ -412,7 +485,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
metric_load_from_index.start { end =>
withTx { tx =>
val helper = new TxHelper(tx)
- import JavaConversions._
import helper._
val location = messageKeyIndex.get(messageKey)
@@ -922,10 +994,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
case x: PutMapEntry.Getter =>
case x: RemoveMapEntry.Getter =>
- case x: OpenStream.Getter =>
- case x: WriteStream.Getter =>
- case x: CloseStream.Getter =>
- case x: RemoveStream.Getter =>
+ case x: AddDirect.Getter =>
+
+ directIndex.put(x.key, x.freeze)
+
+ case x: RemoveDirect.Getter =>
+
+ val record:AddDirect.Getter = directIndex.remove(x.getDirectKey)
+ if( record!=null ) {
+ val page_count: Int = directFile.pages(record.getSize)
+ directFile.allocator.free(record.getPage, page_count)
+ }
+
}
}
@@ -950,7 +1030,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
def flush() = {
val start = System.currentTimeMillis()
- pageFile.flush
+ indexFile.flush
val end = System.currentTimeMillis()
if (end - start > 1000) {
warn("Index flush latency: %,.3f seconds", ((end - start) / 1000.0f))
@@ -1043,6 +1123,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)
lazy val messageRefsIndex = MESSAGE_REFS_INDEX_FACTORY.open(_tx, rootBuffer.getMessageRefsIndexPage)
lazy val subscriptionIndex = SUBSCRIPTIONS_INDEX_FACTORY.open(_tx, rootBuffer.getSubscriptionIndexPage)
+ lazy val directIndex = DIRECT_INDEX_FACTORY.open(_tx, rootBuffer.getDirectIndexPage)
def addAndGet[K](index:SortedIndex[K, jl.Integer], key:K, amount:Int):Int = {
var counter = index.get(key)
@@ -1104,7 +1185,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
private def withTx[T](func: (Transaction) => T): T = {
- val tx = pageFile.tx
+ val tx = indexFile.tx
var ok = false
try {
val rc = func(tx)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStore.scala Wed Jul 7 04:17:35 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.broker.store.hawtdb
-import org.apache.activemq.broker.store.{StoreUOW, Store}
import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
@@ -30,6 +29,7 @@ import ReporterLevel._
import java.util.concurrent._
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util.{IntMetricCounter, TimeCounter, IntCounter}
+import org.apache.activemq.broker.store.{DirectRecordStore, StoreUOW, Store}
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -58,7 +58,7 @@ object HawtDBStore extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class HawtDBStore extends Store with BaseService with DispatchLogging {
+class HawtDBStore extends Store with DirectRecordStore with BaseService with DispatchLogging {
import HawtDBStore._
override protected def log = HawtDBStore
@@ -125,7 +125,37 @@ class HawtDBStore extends Store with Bas
/////////////////////////////////////////////////////////////////////
//
- // Implementation of the BrokerDatabase interface
+ // Implementation of the DirectRecordStore interface
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def createDirectRecord(size: Int)(callback: (DirectRecord) => Unit) = {
+ executor_pool {
+ client.createDirectRecord(size)(callback)
+ }
+ }
+
+ def openDirectRecord(key: Long)(callback: (Option[DirectRecord]) => Unit) = {
+ executor_pool {
+ client.openDirectRecord(key)(callback)
+ }
+ }
+
+ def closeDirectRecord(record: DirectRecord) = {
+ executor_pool {
+ client.closeDirectRecord(record)
+ }
+ }
+
+ def removeDirectRecord(key: Long)(callback: (Boolean) => Unit) = {
+ executor_pool {
+ client.removeDirectRecord(key)(callback)
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Implementation of the Store interface
//
/////////////////////////////////////////////////////////////////////
@@ -133,11 +163,13 @@ class HawtDBStore extends Store with Bas
* Deletes all stored data from the store.
*/
def purge(callback: =>Unit) = {
- client.purge(^{
- next_queue_key.set(1)
- next_msg_key.set(1)
- callback
- })
+ executor_pool {
+ client.purge(^{
+ next_queue_key.set(1)
+ next_msg_key.set(1)
+ callback
+ })
+ }
}
@@ -145,29 +177,37 @@ class HawtDBStore extends Store with Bas
* Ges the last queue key identifier stored.
*/
def getLastQueueKey(callback:(Option[Long])=>Unit):Unit = {
- callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+ executor_pool {
+ callback(Some(client.rootBuffer.getLastQueueKey.longValue))
+ }
}
def addQueue(record: QueueRecord)(callback: (Boolean) => Unit) = {
- client.addQueue(record, ^{ callback(true) })
+ executor_pool {
+ client.addQueue(record, ^{ callback(true) })
+ }
}
def removeQueue(queueKey: Long)(callback: (Boolean) => Unit) = {
- client.removeQueue(queueKey,^{ callback(true) })
+ executor_pool {
+ client.removeQueue(queueKey,^{ callback(true) })
+ }
}
def getQueueStatus(queueKey: Long)(callback: (Option[QueueStatus]) => Unit) = {
- executor_pool ^{
+ executor_pool {
callback( client.getQueueStatus(queueKey) )
}
}
def listQueues(callback: (Seq[Long]) => Unit) = {
- executor_pool ^{
+ executor_pool {
callback( client.listQueues )
}
}
+
+
val load_source = createSource(new ListEventAggregator[(Long, (Option[MessageRecord])=>Unit)](), dispatchQueue)
load_source.setEventHandler(^{drain_loads});
load_source.resume
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Helpers.scala Wed Jul 7 04:17:35 2010
@@ -18,13 +18,13 @@ package org.apache.activemq.broker.store
import model._
import model.Type.TypeCreatable
-import org.apache.activemq.apollo.store.{MessageRecord, QueueRecord, QueueEntryRecord}
import org.fusesource.hawtbuf.codec._
import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
import java.io.{IOException, DataInput, DataOutput}
import org.fusesource.hawtdb.internal.journal.{LocationCodec, Location}
import org.fusesource.hawtdb.api._
import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessage}
+import org.apache.activemq.apollo.store.{DirectRecord, MessageRecord, QueueRecord, QueueEntryRecord}
/**
* <p>
@@ -109,7 +109,7 @@ object Helpers {
rc.protocol = pb.getProtocol
rc.size = pb.getSize
rc.value = pb.getValue
- rc.stream = pb.getStreamKey
+ rc.directKey = pb.getStreamKey
rc.expiration = pb.getExpiration
rc
}
@@ -120,7 +120,7 @@ object Helpers {
pb.setProtocol(v.protocol)
pb.setSize(v.size)
pb.setValue(v.value)
- pb.setStreamKey(v.stream)
+ pb.setStreamKey(v.directKey)
pb.setExpiration(v.expiration)
pb
}
@@ -147,6 +147,13 @@ object Helpers {
pb
}
+ implicit def toDirectRecord(pb: AddDirect.Getter): DirectRecord = {
+ val rc = new DirectRecord
+ rc.key = pb.getDirectKey
+ rc.size = pb.getSize
+ rc
+ }
+
implicit def toLocation(value: Long): Location = {
val temp = new Buffer(8)
val editor = temp.bigEndianEditor
@@ -228,4 +235,9 @@ object Helpers {
SUBSCRIPTIONS_INDEX_FACTORY.setValueCodec(AddSubscription.FRAMED_CODEC);
SUBSCRIPTIONS_INDEX_FACTORY.setDeferredEncoding(true);
+ val DIRECT_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddDirect.Buffer]();
+ DIRECT_INDEX_FACTORY.setKeyCodec(LongCodec.INSTANCE);
+ DIRECT_INDEX_FACTORY.setValueCodec(AddDirect.FRAMED_CODEC);
+ DIRECT_INDEX_FACTORY.setDeferredEncoding(true);
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:17:35 2010
@@ -29,9 +29,9 @@ import Stomp._
import BufferConversions._
import StompFrameConstants._
import java.io.IOException
-import org.apache.activemq.broker.store.StoreUOW
import org.apache.activemq.selector.SelectorParser
import org.apache.activemq.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.broker.store.{DirectRecordStore, StoreUOW}
object StompConstants {
@@ -170,6 +170,10 @@ class StompProtocolHandler extends Proto
connection.connector.broker.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
+ if( this.host.store!=null && this.host.store.isInstanceOf[DirectRecordStore] ) {
+ val wf = connection.transport.getWireformat.asInstanceOf[StompWireFormat]
+ wf.direct_record_store = this.host.store.asInstanceOf[DirectRecordStore]
+ }
connection.transport.resumeRead
}
)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:17:35 2010
@@ -70,7 +70,7 @@ class StompWireFormat extends WireFormat
import StompWireFormat._
override protected def log: Log = StompWireFormat
- var directRecordStore:DirectRecordStore = null
+ var direct_record_store:DirectRecordStore = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/DirectRecord.java Wed Jul 7 04:17:35 2010
@@ -22,13 +22,14 @@ import org.fusesource.hawtbuf.Buffer;
import java.nio.ByteBuffer;
/**
- * A memory mapped record.
+ * A memory mapped direct buffer associated with a key
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class DirectRecord {
public long key = -1;
+ public int size = 0;
public ByteBuffer buffer;
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul 7 04:17:35 2010
@@ -28,7 +28,7 @@ public class MessageRecord {
public AsciiBuffer protocol;
public int size;
public Buffer value;
- public long stream = -1;
+ public long directKey = -1;
public long expiration = 0;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?rev=961197&r1=961196&r2=961197&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul 7 04:17:35 2010
@@ -105,6 +105,24 @@ trait Store extends ServiceTrait {
*/
def loadMessage(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
-
}
+/**
+ * Optional interface that stores can implement to give protocols direct access to the file system
+ * for them to be able to do
+ */
+trait DirectRecordStore {
+
+ def createDirectRecord(size:Int)(callback:(DirectRecord)=>Unit):Unit
+
+ def openDirectRecord(key:Long)(callback:(Option[DirectRecord])=>Unit):Unit
+
+ def closeDirectRecord(record:DirectRecord):Unit
+
+ def removeDirectRecord(record:DirectRecord)(callback:(Boolean)=>Unit):Unit = {
+ removeDirectRecord(record.key)( callback )
+ }
+ def removeDirectRecord(key:Long)(callback:(Boolean)=>Unit):Unit
+
+
+}