You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/29 19:43:03 UTC
svn commit: r1162914 - in /activemq/activemq-apollo/trunk: ./
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/
apollo-broker/src/main/proto/ apollo-broker/...
Author: chirino
Date: Mon Aug 29 17:43:03 2011
New Revision: 1162914
URL: http://svn.apache.org/viewvc?rev=1162914&view=rev
Log:
Avoid using deprecated hawtdispatch APIs. This in turn required us to rework the zero copy strategy.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
- copied, changed from r1162770, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Mon Aug 29 17:43:03 2011
@@ -64,11 +64,11 @@ class BDBClient(store: BDBStore) {
var environment:Environment = _
- var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+ var direct_buffer_allocator: FileDirectBufferAllocator = _
- def zero_copy_dir = {
+ def direct_buffer_file = {
import FileSupport._
- config.directory / "zerocp"
+ config.directory / "dbuffer.dat"
}
def start() = {
@@ -79,10 +79,7 @@ class BDBClient(store: BDBStore) {
directory.mkdirs
- if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
- zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
- zero_copy_buffer_allocator.start
- }
+ direct_buffer_allocator = new FileDirectBufferAllocator(direct_buffer_file)
environment = new Environment(directory, env_config);
@@ -92,22 +89,18 @@ class BDBClient(store: BDBStore) {
message_refs_db
queues_db
- if( zero_copy_buffer_allocator!=null ) {
- zerocp_db.cursor(tx) { (_,value)=>
- val v = decode_zcp_value(value)
- zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
- true
- }
+ lobs_db.cursor(tx) { (_,value)=>
+ val v = decode_lob_value(value)
+ direct_buffer_allocator.alloc_at(v._1, v._2)
+ true
}
}
}
def stop() = {
environment.close
- if( zero_copy_buffer_allocator!=null ) {
- zero_copy_buffer_allocator.stop
- zero_copy_buffer_allocator = null
- }
+ direct_buffer_allocator.close
+ direct_buffer_allocator = null
}
case class TxContext(tx:Transaction) {
@@ -137,12 +130,12 @@ class BDBClient(store: BDBStore) {
_messages_db
}
- private var _zerocp_db:Database = _
- def zerocp_db:Database = {
- if( _zerocp_db==null ) {
- _zerocp_db = environment.openDatabase(tx, "zerocp", long_key_conf)
+ private var _lobs_db:Database = _
+ def lobs_db:Database = {
+ if( _lobs_db==null ) {
+ _lobs_db = environment.openDatabase(tx, "lobs", long_key_conf)
}
- _zerocp_db
+ _lobs_db
}
private var _message_refs_db:Database = _
@@ -185,6 +178,9 @@ class BDBClient(store: BDBStore) {
if( _map_db!=null ) {
_map_db.close
}
+ if( _lobs_db!=null ) {
+ _lobs_db.close
+ }
if(ok){
tx.commit
@@ -299,13 +295,11 @@ class BDBClient(store: BDBStore) {
import ctx._
if( add_and_get(message_refs_db, msg_key, -1, tx)==0 ) {
messages_db.delete(tx, msg_key)
- if( zero_copy_buffer_allocator!=null ){
- zerocp_db.get(tx, to_database_entry(msg_key)).foreach { v=>
- val location = decode_zcp_value(v)
- zero_copy_buffer_allocator.free(location._1, location._2, location._3)
- }
- zerocp_db.delete(tx, msg_key)
+ lobs_db.get(tx, to_database_entry(msg_key)).foreach { v=>
+ val location = decode_lob_value(v)
+ direct_buffer_allocator.free(location._1, location._2)
}
+ lobs_db.delete(tx, msg_key)
}
}
@@ -334,7 +328,7 @@ class BDBClient(store: BDBStore) {
val sync = uows.find( ! _.complete_listeners.isEmpty ).isDefined
with_ctx(sync) { ctx=>
import ctx._
- var zcp_files_to_sync = Set[Int]()
+ var sync_lobs = false
uows.foreach { uow =>
for((key,value) <- uow.map_actions) {
@@ -352,14 +346,13 @@ class BDBClient(store: BDBStore) {
if (message_record != null) {
import PBSupport._
- val pb = if( message_record.zero_copy_buffer != null ) {
+ val pb = if( message_record.direct_buffer != null ) {
val r = to_pb(action.message_record).copy
- val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
- r.setZcpFile(buffer.file)
- r.setZcpOffset(buffer.offset)
- r.setZcpSize(buffer.size)
- zerocp_db.put(tx, message_record.key, (buffer.file, buffer.offset, buffer.size))
- zcp_files_to_sync += buffer.file
+ val buffer = direct_buffer_allocator.copy(message_record.direct_buffer)
+ r.setDirectOffset(buffer.offset)
+ r.setDirectSize(buffer.size)
+ lobs_db.put(tx, message_record.key, (buffer.offset, buffer.size))
+ sync_lobs = true
r.freeze
} else {
to_pb(action.message_record)
@@ -379,8 +372,8 @@ class BDBClient(store: BDBStore) {
}
}
}
- if( zero_copy_buffer_allocator!=null ) {
- zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+ if( sync_lobs ) {
+ direct_buffer_allocator.sync
}
}
callback.run
@@ -490,8 +483,8 @@ class BDBClient(store: BDBStore) {
import PBSupport._
val pb:MessagePB.Buffer = data
val rc = from_pb(pb)
- if( pb.hasZcpFile ) {
- rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+ if( pb.hasDirectSize ) {
+ rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
}
rc
}
@@ -519,8 +512,8 @@ class BDBClient(store: BDBStore) {
import PBSupport._
val pb:MessagePB.Buffer = data
val rc = from_pb(pb)
- if( pb.hasZcpFile ) {
- rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+ if( pb.hasDirectSize ) {
+ rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
}
rc
}
@@ -583,16 +576,10 @@ class BDBClient(store: BDBStore) {
messages_db.cursor(tx) { (_, data) =>
import PBSupport._
val pb = MessagePB.FACTORY.parseUnframed(data.getData)
- if( pb.hasZcpFile ) {
- val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
- var data = pb.copy
- data.clearZcpFile
- data.clearZcpFile
- // write the pb frame and then the direct buffer data..
- data.freeze.writeFramed(message_stream)
- zcpb.read(message_stream)
- } else {
- pb.writeFramed(message_stream)
+ pb.writeFramed(message_stream)
+ if( pb.hasDirectSize ) {
+ val buffer_cp_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
+ buffer_cp_buffer.read(message_stream)
}
true
}
@@ -641,9 +628,6 @@ class BDBClient(store: BDBStore) {
}
}
- var zcp_counter = 0
- val max_ctx = zero_copy_buffer_allocator.contexts.size
-
streams.using_map_stream { stream=>
foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
map_db.put(tx, pb.getKey, pb.getValue)
@@ -653,19 +637,14 @@ class BDBClient(store: BDBStore) {
streams.using_message_stream { message_stream=>
foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
- val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
- val cp = pb.copy
- val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
- cp.setZcpFile(zcpb.file)
- cp.setZcpOffset(zcpb.offset)
-
- zcp_counter += 1
- zcpb.write(message_stream)
-
- zerocp_db.put(tx, pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
- cp.freeze
- } else {
- pb
+ var record:MessagePB.Buffer = pb
+ if( record.hasDirectSize ) {
+ val cp = record.copy
+ val buffer_cp_buffer = direct_buffer_allocator.alloc(cp.getDirectSize)
+ cp.setDirectOffset(buffer_cp_buffer.offset)
+ buffer_cp_buffer.write(message_stream)
+ lobs_db.put(tx, pb.getMessageKey, (buffer_cp_buffer.offset, buffer_cp_buffer.size))
+ record = cp.freeze
}
messages_db.put(tx, record.getMessageKey, record)
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Mon Aug 29 17:43:03 2011
@@ -57,8 +57,6 @@ class BDBStore(var config:BDBStoreDTO) e
protected def get_next_msg_key = next_msg_key.getAndIncrement
- override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
-
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
write_executor {
client.store(uows, ^{
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Mon Aug 29 17:43:03 2011
@@ -38,19 +38,17 @@ object HelperTrait {
implicit def to_buffer(entry: DatabaseEntry): Buffer = new Buffer(entry.getData)
implicit def to_database_entry(v: Buffer): DatabaseEntry = new DatabaseEntry(v.toByteArray)
- implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+ implicit def decode_lob_value(entry: DatabaseEntry): (Long,Int) = {
val in = new DataByteArrayInputStream(entry.getData)
- (in.readVarInt(), in.readVarLong(), in.readVarInt())
+ (in.readVarLong(), in.readVarInt())
}
- implicit def encode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+ implicit def encode_zcp_value(v: (Long,Int)): DatabaseEntry = {
val out = new DataByteArrayOutputStream(
- AbstractVarIntSupport.computeVarIntSize(v._1) +
- AbstractVarIntSupport.computeVarLongSize(v._2) +
- AbstractVarIntSupport.computeVarIntSize(v._3)
+ AbstractVarIntSupport.computeVarLongSize(v._1) +
+ AbstractVarIntSupport.computeVarIntSize(v._2)
)
- out.writeVarInt(v._1)
- out.writeVarLong(v._2)
- out.writeVarInt(v._3)
+ out.writeVarLong(v._1)
+ out.writeVarInt(v._2)
new DatabaseEntry(out.toBuffer.data)
}
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java Mon Aug 29 17:43:03 2011
@@ -37,9 +37,6 @@ public class BDBStoreDTO extends StoreDT
@XmlAttribute(name="read_threads")
public Integer read_threads;
- @XmlAttribute(name="zero_copy")
- public Boolean zero_copy;
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -50,7 +47,6 @@ public class BDBStoreDTO extends StoreDT
if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null) return false;
- if (zero_copy != null ? !zero_copy.equals(that.zero_copy) : that.zero_copy != null) return false;
return true;
}
@@ -60,7 +56,6 @@ public class BDBStoreDTO extends StoreDT
int result = super.hashCode();
result = 31 * result + (directory != null ? directory.hashCode() : 0);
result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0);
- result = 31 * result + (zero_copy != null ? zero_copy.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Mon Aug 29 17:43:03 2011
@@ -29,10 +29,10 @@ message MessagePB {
optional bytes value = 4;
optional sint64 expiration = 5;
- optional bytes zcp_data = 10;
- optional int32 zcp_file = 12;
- optional int64 zcp_offset = 13;
- optional int32 zcp_size = 14;
+ optional bytes direct_data = 10;
+ optional bytes direct_file = 12;
+ optional int64 direct_offset = 13;
+ optional int32 direct_size = 14;
}
message QueuePB {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Aug 29 17:43:03 2011
@@ -25,7 +25,7 @@ import org.apache.activemq.apollo.util.O
import org.apache.activemq.apollo.dto._
import security._
import security.SecuredResource.VirtualHostKind
-import store.{PersistentLongCounter, Store, StoreFactory}
+import store._
trait VirtualHostFactory {
def create(broker:Broker, dto:VirtualHostDTO):VirtualHost
@@ -104,6 +104,8 @@ class VirtualHost(val broker: Broker, va
var connection_log:Log = _
var console_log:Log = _
+ var direct_buffer_allocator:DirectBufferAllocator = null
+
def resource_kind = VirtualHostKind
@volatile
@@ -165,6 +167,13 @@ class VirtualHost(val broker: Broker, va
override protected def _start(on_completed:Runnable):Unit = {
apply_update
+ if ( config.heap_bypass.getOrElse(0) > 0 ) {
+ import org.apache.activemq.apollo.util.FileSupport._
+ val tmp_dir = broker.tmp / "heapbypass" / id
+ tmp_dir.recursive_delete
+ direct_buffer_allocator = new ConcurrentFileDirectBufferAllocator(tmp_dir)
+ }
+
store = StoreFactory.create(config.store)
val tracker = new LoggingTracker("virtual host startup", console_log)
@@ -229,7 +238,13 @@ class VirtualHost(val broker: Broker, va
task.run()
}
}
- tracker.callback(on_completed)
+ tracker.callback(dispatch_queue.runnable {
+ if( direct_buffer_allocator !=null ) {
+ direct_buffer_allocator.close
+ direct_buffer_allocator
+ }
+ on_completed.run()
+ })
}
Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (from r1162770, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala&r1=1162770&r2=1162914&rev=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala Mon Aug 29 17:43:03 2011
@@ -26,8 +26,9 @@ import org.fusesource.hawtdispatch.Retai
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait ZeroCopyBufferAllocator {
- def alloc(size:Int):ZeroCopyBuffer
+trait DirectBufferAllocator {
+ def alloc(size:Int):DirectBuffer
+ def close
}
/**
@@ -40,7 +41,7 @@ trait ZeroCopyBufferAllocator {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait ZeroCopyBuffer extends Retained {
+trait DirectBuffer extends Retained {
def size:Int
@@ -50,6 +51,8 @@ trait ZeroCopyBuffer extends Retained {
def read(src: Int, target: WritableByteChannel): Int
+ def copy(src:DirectBuffer): Unit
+
def write(src:ReadableByteChannel, target:Int): Int
def write(src:ByteBuffer, target:Int):Int
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala?rev=1162914&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileDirectBufferAllocator.scala Mon Aug 29 17:43:03 2011
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.fusesource.hawtdispatch.BaseRetained
+import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
+import java.io._
+import org.apache.activemq.apollo.util._
+import java.nio.channels.FileChannel.MapMode
+import java.security.{AccessController, PrivilegedAction}
+import java.nio.{MappedByteBuffer, ByteBuffer}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{ConcurrentLinkedQueue, ConcurrentHashMap, TimeUnit}
+import java.util.Comparator
+
+/**
+ * <p>Tracks allocated space</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class Allocation(offset:Long, size:Int) {
+ var _free_func: (Allocation)=>Unit = _
+ def free() = {
+ _free_func(this)
+ }
+}
+
+object Range {
+ def apply(a:Allocation):Range = Range(a.offset, a.size)
+}
+
+/**
+ * A range of space.
+ */
+case class Range(offset:Long, size:Long) {
+
+ // split the allocation..
+ def split(request:Int):(Range, Range) = {
+ assert(request < size)
+ var first = Range(offset, request)
+ var second = Range(offset+request, size-request)
+ (first, second)
+ }
+
+ // join the range..
+ def join(that:Range):Range = {
+ assert( that.offset == offset+size)
+ Range(offset, size+that.size)
+ }
+
+}
+
+trait Allocator {
+ def alloc(request:Int):Allocation
+
+ def chain(that:Allocator):Allocator = new Allocator() {
+ def alloc(request: Int): Allocation = {
+ val rc = Allocator.this.alloc(request)
+ if( rc == null ) {
+ that.alloc(request)
+ } else {
+ rc
+ }
+ }
+ }
+}
+
+/**
+ * <p>Manges allocation space using a couple trees to track the free areas.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TreeAllocator(range:Range) extends Allocator {
+
+ // list of the free allocation areas. Sorted by size then offset
+ val free_by_size = new TreeMap[Range, Zilch](new Comparator[Range] {
+ def compare(p1: Range, p2: Range) = {
+ var rc = p1.size - p2.size
+ if( rc!=0 ) {
+ rc = p1.offset - p2.offset
+ }
+ if ( rc == 0 ) {
+ 0
+ } else if ( rc < 0 ) {
+ -1
+ } else {
+ 1
+ }
+ }
+ })
+
+ // list of the free allocation areas sorted by offset.
+ val free_by_offset = new TreeMap[Long, Range]()
+
+ free_by_offset.put(range.offset, range)
+ free_by_size.put(range, null)
+
+ def alloc(request:Int):Allocation = {
+ var spot_entry = free_by_size.ceilingEntry(Range(0,request))
+ if( spot_entry== null ) {
+ return null
+ }
+
+ val range = spot_entry.getKey
+ free_by_size.removeEntry(spot_entry)
+ free_by_offset.remove(range.offset)
+
+ // might be the perfect size
+ val rc = if( range.size == request ) {
+ range
+ } else {
+ // split the allocation..
+ var (first, second) = range.split(request)
+
+ // put the free part in the free map.
+ free_by_offset.put(second.offset, second)
+ free_by_size.put(second, null)
+
+ first
+ }
+ val allocation = Allocation(rc.offset, request)
+ allocation._free_func = free
+ allocation
+ }
+
+ def alloc_at(req:Allocation):Boolean = {
+ var spot_entry = free_by_offset.floorEntry(req.offset)
+ if( spot_entry== null ) {
+ return false
+ }
+
+ var spot = spot_entry.getValue
+ if( spot.offset+spot.size < req.offset+req.size ) {
+ return false
+ }
+
+ free_by_offset.removeEntry(spot_entry)
+ free_by_size.remove(spot)
+
+ // only need to put back if it was not exactly what we need.
+ if( spot != req ) {
+
+ // deal with excess at the front
+ if( spot.offset != req.offset ) {
+ val (prev, next) = spot.split((req.offset - spot.offset).toInt)
+ free_by_offset.put(prev.offset, prev)
+ free_by_size.put(prev, null)
+ spot = next
+ }
+
+ // deal with excess at the rear
+ if( spot.size != req.size ) {
+ val (prev, next) = spot.split(req.size)
+ free_by_offset.put(next.offset, next)
+ free_by_size.put(next, null)
+ }
+ }
+
+ req._free_func = free
+ true
+ }
+
+ def free(allocation:Allocation):Unit = {
+
+ var prev_e = free_by_offset.floorEntry(allocation.offset)
+ var next_e = if( prev_e!=null ) {
+ prev_e.next
+ } else {
+ free_by_offset.ceilingEntry(allocation.offset)
+ }
+
+ val prev = Option(prev_e).map(_.getValue).map( a=> if(a.offset+a.size == allocation.offset) a else null ).getOrElse(null)
+ val next = Option(prev_e).map(_.getValue).map( a=> if(allocation.offset+allocation.size == a.offset) a else null ).getOrElse(null)
+
+ val range = Range(allocation)
+ (prev, next) match {
+ case (null, null)=>
+ allocation._free_func = null
+ free_by_size.put(range, null)
+ free_by_offset.put(range.offset, range)
+
+ case (prev, null)=>
+ val joined = prev.join(range)
+ free_by_size.remove(prev)
+ free_by_size.put(joined, null)
+ free_by_offset.put(joined.offset, joined)
+
+ case (null, next)=>
+ val joined = range.join(next)
+ free_by_size.remove(next)
+ free_by_size.put(joined, null)
+
+ free_by_offset.remove(next.offset)
+ free_by_offset.put(joined.offset, joined)
+
+ case (prev, next)=>
+ val joined = prev.join(range.join(next))
+ free_by_size.remove(prev)
+ free_by_size.remove(next)
+ free_by_size.put(joined, null)
+
+ free_by_offset.remove(next.offset)
+ free_by_offset.put(joined.offset, joined)
+ }
+ }
+}
+
+/**
+ * Helps minimize the active page set by allocating in areas
+ * which had previously been allocated.
+ */
+class ActiveAllocator(val range:Range) extends Allocator {
+
+ // the cold allocated start with all the free space..
+ val inactive = new TreeAllocator(range)
+
+ // the hot is clear of any free space.
+ val active = new TreeAllocator(range)
+
+ active.free_by_offset.clear
+ active.free_by_size.clear
+
+ // allocate out of the hot area first since
+ // that should result in less vm swapping
+ val chain = active.chain(inactive)
+
+ def alloc(request:Int):Allocation = {
+ var rc = chain.alloc(request)
+ if( rc!=null ) {
+ rc._free_func = free
+ }
+ rc
+ }
+
+ def free(allocation:Allocation):Unit = {
+ // put stuff back in the hot tree.
+ active.free(allocation)
+ }
+
+}
+
+/**
+ * <p>The ByteBufferReleaser allows you to more eagerly deallocate byte buffers.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ByteBufferReleaser {
+ val release: (ByteBuffer) => Unit = {
+
+ // Try to drill into the java.nio.DirectBuffer internals...
+ AccessController.doPrivileged(new PrivilegedAction[(ByteBuffer) => Unit]() {
+ def run = {
+ try {
+
+ val cleanerMethod = ByteBuffer.allocateDirect(1).getClass().getMethod("cleaner")
+ cleanerMethod.setAccessible(true)
+ val cleanMethod = cleanerMethod.getReturnType().getMethod("clean")
+
+ def clean(buffer: ByteBuffer):Unit = {
+ try {
+ val cleaner = cleanerMethod.invoke(buffer)
+ if (cleaner != null) {
+ cleanMethod.invoke(cleaner)
+ }
+ } catch {
+ case e: Throwable => e.printStackTrace
+ }
+ }
+
+ clean _
+ } catch {
+ case _ =>
+ def noop(buffer: ByteBuffer):Unit = { }
+ noop _
+ }
+ }
+ })
+ }
+}
+
+object FileDirectBufferAllocator {
+ val OS = System.getProperty("os.name").toLowerCase
+
+ val MMAP_TRANSFER_TO = Option(System.getProperty("apollo.MMAP_TRANSFER_TO")).map(_ == "true").getOrElse{
+ // System prop is not set.. lets pick a good default based on OS
+ if( OS.startsWith("mac") ) {
+ // mmap is faster on the mac than the FileChannel.transferTo call.
+ true
+ } else {
+ false
+ }
+ }
+ val MMAP_TRANSFER_FROM = Option(System.getProperty("apollo.MMAP_TRANSFER_FROM")).map(_ == "true").getOrElse{
+ // System prop is not set.. lets pick a good default based on OS
+ if( OS.startsWith("mac") ) {
+ false
+ } else {
+ false
+ }
+ }
+}
+
+class FileDirectBufferAllocator(val file:File) extends DirectBufferAllocator {
+ import FileDirectBufferAllocator._
+
+ file.getParentFile.mkdirs()
+
+ val allocator = new TreeAllocator(Range(0, Long.MaxValue))
+ val channel:FileChannel = new RandomAccessFile(file, "rw").getChannel
+ val free_queue = new ConcurrentLinkedQueue[Allocation]()
+ var current_size = 0L
+ var _mmap:MappedByteBuffer = _
+
+ channel.truncate(0);
+
+ def close() = {
+ if(_mmap!=null) {
+ ByteBufferReleaser.release(_mmap)
+ _mmap = null
+ }
+ channel.close()
+ }
+
+ def mmap_slice(offset:Long, size:Int) = {
+ if( _mmap == null ) {
+ _mmap = channel.map(MapMode.READ_WRITE, 0, current_size)
+ }
+
+ // remaps more of the file when needed.
+ if( _mmap.capacity < offset+size ) {
+ assert(current_size >= offset+size)
+ ByteBufferReleaser.release(_mmap)
+
+ val grow = 1024*1024*64
+ _mmap = channel.map(MapMode.READ_WRITE, 0, current_size+grow)
+
+ // initialize the grown part...
+ _mmap.position(current_size.toInt)
+ while(_mmap.hasRemaining) {
+ _mmap.put(0.toByte)
+ }
+ current_size += grow
+ _mmap.clear
+ }
+
+ _mmap.position(offset.toInt)
+ _mmap.limit(offset.toInt+size)
+ val slice = _mmap.slice
+ _mmap.clear
+ slice
+ }
+
+ /**
+ * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+ class AllocationBuffer(val allocation:Allocation) extends BaseRetained with DirectBuffer {
+
+ def file = FileDirectBufferAllocator.this.file
+ def offset: Long = allocation.offset
+ def size: Int = allocation.size.toInt
+
+ var buffer = if( MMAP_TRANSFER_TO ) {
+ mmap_slice(offset, size)
+ } else {
+ null
+ }
+
+ override def dispose: Unit = {
+ free_queue.add(allocation)
+ if( buffer!=null ) {
+ ByteBufferReleaser.release(buffer)
+ buffer = null
+ }
+ super.dispose
+ }
+
+ def remaining(pos: Int): Int = size-pos
+
+ def time[T](name:String)(func: =>T):T = {
+ val c = new TimeCounter
+ try {
+ c.time(func)
+ } finally {
+ println("%s: %.2f".format(name, c.apply(true).maxTime(TimeUnit.MILLISECONDS)))
+ }
+ }
+
+ def read(src: Int, target: WritableByteChannel): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(src)
+ assert(count>=0)
+
+ if( MMAP_TRANSFER_TO ) {
+ buffer.position(src);
+ buffer.limit(src+count)
+ val slice = buffer.slice();
+ try {
+ target.write(slice)
+ } finally {
+ ByteBufferReleaser.release(slice)
+ }
+ } else {
+ channel.transferTo(offset+src, count, target).toInt
+ }
+ }
+
+ def write(src: ReadableByteChannel, target:Int): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(target)
+ assert(count>=0)
+
+ if( MMAP_TRANSFER_FROM ) {
+ buffer.position(target);
+ buffer.limit(target+count)
+ val slice = buffer.slice();
+ try {
+ src.read(slice)
+ } finally {
+ ByteBufferReleaser.release(slice)
+ }
+ } else {
+ channel.transferFrom(src, offset+target, count).toInt
+ }
+ }
+
+ def copy(src: DirectBuffer) = {
+ if( src.size != this.size ) {
+ throw new IllegalArgumentException("src buffer does not match the size of this buffer")
+ }
+ src.read(0, channel)
+ }
+
+ def read(target: OutputStream): Unit = {
+ assert(retained > 0)
+ val b = ByteBuffer.allocate(size.min(1024*4))
+ var pos = 0
+ while( remaining(pos)> 0 ) {
+ val count = channel.read(b, offset+pos)
+ if( count == -1 ) {
+ throw new EOFException()
+ }
+ target.write(b.array, 0, count)
+ pos += count
+ b.clear
+ }
+ }
+
+ def write(src: ByteBuffer, target: Int): Int = {
+ assert(retained > 0)
+ val diff = src.remaining - remaining(target)
+ if( diff > 0 ) {
+ src.limit(src.limit-diff)
+ }
+ try {
+ channel.write(src, offset+target).toInt
+ } finally {
+ if( diff > 0 ) {
+ src.limit(src.limit+diff)
+ }
+ }
+ }
+
+ def write(target: InputStream): Unit = {
+ assert(retained > 0)
+ val b = ByteBuffer.allocate(size.min(1024*4))
+ var pos = 0
+ while( remaining(pos)> 0 ) {
+ val max = remaining(pos).min(b.capacity)
+ b.clear
+ val count = target.read(b.array, 0, max)
+ if( count == -1 ) {
+ throw new EOFException()
+ }
+ val x = channel.write(b)
+ assert(x == count)
+ pos += count
+ }
+ }
+ }
+
+ def alloc(size: Int) = {
+ drain_free_allocations
+ val allocation = allocator.alloc(size)
+ assert(allocation!=null)
+ current_size = current_size.max(allocation.offset + allocation.size)
+ new AllocationBuffer(allocation)
+ }
+
+ def alloc_at(offset:Long, size:Int) = {
+ allocator.alloc_at(Allocation(offset, size))
+ }
+
+ def free(offset:Long, size:Int) = {
+ allocator.free(Allocation(offset, size))
+ }
+
+ def slice(offset:Long, size:Int) = {
+ new AllocationBuffer(Allocation(offset, size))
+ }
+
+ def drain_free_allocations = {
+ var allocation = free_queue.poll()
+ while( allocation!=null ) {
+ allocator.free(allocation)
+ allocation = free_queue.poll()
+ }
+ }
+
+ def copy(source:DirectBuffer) = {
+ val rc = alloc(source.size)
+ rc.copy(source)
+ rc
+ }
+
+ def sync = {
+ channel.force(true)
+ }
+}
+
+
+/**
+ * <p>A ZeroCopyBufferAllocator which allocates on files.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ConcurrentFileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
+ import FileDirectBufferAllocator._
+
+ final val context_counter = new AtomicInteger();
+ final val contexts = new ConcurrentHashMap[Thread, FileDirectBufferAllocator]();
+
+ @volatile
+ var closed = false;
+
+ directory.mkdirs
+ closed = false;
+
+ def close() = {
+ closed = true;
+ import collection.JavaConversions._
+ contexts.values().foreach(_.close)
+ contexts.clear
+ }
+
+ def alloc(size: Int): DirectBuffer = {
+ val thread: Thread = Thread.currentThread()
+ var ctx = contexts.get(thread)
+ if( ctx == null ) {
+ if (closed) {
+ throw new IllegalStateException("Stopped");
+ } else {
+ var id = context_counter.incrementAndGet();
+ ctx = new FileDirectBufferAllocator(new File(directory, "zerocp-"+id+".data" ))
+ contexts.put(thread, ctx);
+ }
+ }
+ ctx.alloc(size)
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Mon Aug 29 17:43:03 2011
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
import org.fusesource.hawtbuf.AsciiBuffer
import org.fusesource.hawtbuf.Buffer
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import java.util.concurrent.atomic.AtomicReference
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -30,7 +30,7 @@ class MessageRecord {
var protocol: AsciiBuffer = _
var size = 0
var buffer: Buffer = _
- var zero_copy_buffer: ZeroCopyBuffer = _
+ var direct_buffer: DirectBuffer = _
var expiration = 0L
var locator:AtomicReference[Array[Byte]] = _
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Mon Aug 29 17:43:03 2011
@@ -43,12 +43,6 @@ trait Store extends ServiceTrait {
def get_store_status(callback:(StoreStatusDTO)=>Unit)
/**
- * @returns a ZeroCopyBufferAllocator if the store supports protocols
- * using zero copy buffers when transfering messages.
- */
- def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
-
- /**
* Creates a store uow which is used to perform persistent
* operations as unit of work.
*/
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Mon Aug 29 17:43:03 2011
@@ -84,12 +84,18 @@ public class VirtualHostDTO extends Serv
public LogCategoryDTO log_category;
/**
+ * If set the the broker will avoid allocating messages larger than the configured
+ * setting on the JVM heap. They will be held in temp files until consumed or persisted
+ */
+ @XmlElement(name="heap_bypass")
+ public Integer heap_bypass;
+
+ /**
* To hold any other non-matching XML elements
*/
@XmlAnyElement(lax=true)
public List<Object> other = new ArrayList<Object>();
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -104,6 +110,7 @@ public class VirtualHostDTO extends Serv
if (auto_create_destinations != null ? !auto_create_destinations.equals(that.auto_create_destinations) : that.auto_create_destinations != null)
return false;
if (dsubs != null ? !dsubs.equals(that.dsubs) : that.dsubs != null) return false;
+ if (heap_bypass != null ? !heap_bypass.equals(that.heap_bypass) : that.heap_bypass != null) return false;
if (host_names != null ? !host_names.equals(that.host_names) : that.host_names != null) return false;
if (log_category != null ? !log_category.equals(that.log_category) : that.log_category != null) return false;
if (other != null ? !other.equals(that.other) : that.other != null) return false;
@@ -132,6 +139,7 @@ public class VirtualHostDTO extends Serv
result = 31 * result + (regroup_connections != null ? regroup_connections.hashCode() : 0);
result = 31 * result + (authentication != null ? authentication.hashCode() : 0);
result = 31 * result + (log_category != null ? log_category.hashCode() : 0);
+ result = 31 * result + (heap_bypass != null ? heap_bypass.hashCode() : 0);
result = 31 * result + (other != null ? other.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Mon Aug 29 17:43:03 2011
@@ -61,8 +61,6 @@ class HawtDBStore(val config:HawtDBStore
protected def get_next_msg_key = next_msg_key.getAndIncrement
- override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = null
-
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
write_executor {
client.store(uows, ^{
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Mon Aug 29 17:43:03 2011
@@ -50,15 +50,14 @@ object JDBM2Client extends Log {
def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
}
- object ZeroCopyValueSerializer extends Serializer[(Int, Long, Int)] {
- def serialize(out: SerializerOutput, v: (Int,Long, Int)) = {
- out.writePackedInt(v._1)
- out.writePackedLong(v._2)
- out.writePackedInt(v._3)
+ object ZeroCopyValueSerializer extends Serializer[(Long, Int)] {
+ def serialize(out: SerializerOutput, v: (Long, Int)) = {
+ out.writePackedLong(v._1)
+ out.writePackedInt(v._2)
}
def deserialize(in: SerializerInput) = {
- (in.readPackedInt, in.readPackedLong, in.readPackedInt)
+ (in.readPackedLong, in.readPackedInt)
}
}
@@ -163,18 +162,18 @@ class JDBM2Client(store: JDBM2Store) {
var queues_db:HTree[Long, QueueRecord] = _
var entries_db:BTree[(Long,Long), QueueEntryRecord] = _
var messages_db:HTree[Long, MessagePB.Buffer] = _
- var zerocp_db:HTree[Long, (Int, Long, Int)] = _
+ var lobs_db:HTree[Long, (Long, Int)] = _
var message_refs_db:HTree[Long, java.lang.Integer] = _
var map_db:HTree[Buffer, Buffer] = _
var last_message_key = 0L
var last_queue_key = 0L
- var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+ var direct_buffer_allocator: FileDirectBufferAllocator = _
- def zero_copy_dir = {
+ def direct_buffer_file = {
import FileSupport._
- config.directory / "zerocp"
+ config.directory / "dbuffer.data"
}
def start() = {
@@ -184,10 +183,7 @@ class JDBM2Client(store: JDBM2Store) {
config.directory.mkdirs
- if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
- zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
- zero_copy_buffer_allocator.start
- }
+ direct_buffer_allocator = new FileDirectBufferAllocator(direct_buffer_file)
recman = RecordManagerFactory.createRecordManager((config.directory / "jdbm2").getCanonicalPath)
@@ -220,7 +216,7 @@ class JDBM2Client(store: JDBM2Store) {
transaction {
messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
map_db = init_htree("map", value_serializer = BufferSerializer, key_serializer = BufferSerializer)
- zerocp_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
+ lobs_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
message_refs_db = init_htree("message_refs")
queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
entries_db = init_btree("enttries", new QueueEntryKeyComparator, QueueEntryKeySerializer, QueueEntryRecordSerializer)
@@ -228,11 +224,9 @@ class JDBM2Client(store: JDBM2Store) {
last_message_key = Option(recman.getNamedObject("last_message_key")).map(_.longValue).getOrElse(0L)
last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
- if( zero_copy_buffer_allocator!=null ) {
- zerocp_db.cursor { (_,v)=>
- zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
- true
- }
+ lobs_db.cursor { (_,v)=>
+ direct_buffer_allocator.alloc_at(v._1, v._2)
+ true
}
}
@@ -241,10 +235,8 @@ class JDBM2Client(store: JDBM2Store) {
def stop() = {
recman.close
recman = null;
- if( zero_copy_buffer_allocator!=null ) {
- zero_copy_buffer_allocator.stop
- zero_copy_buffer_allocator = null
- }
+ direct_buffer_allocator.close
+ direct_buffer_allocator = null
}
def transaction[T](func: => T): T = {
@@ -268,8 +260,8 @@ class JDBM2Client(store: JDBM2Store) {
if( config.directory.isDirectory ) {
config.directory.listFiles.filter(_.getName.startsWith("jdbm2.")).foreach(_.delete)
}
- if( zero_copy_dir.isDirectory ) {
- zero_copy_dir.listFiles.foreach(_.delete)
+ if( direct_buffer_file.isDirectory ) {
+ direct_buffer_file.listFiles.foreach(_.delete)
}
}
if( recman!=null ) {
@@ -324,12 +316,10 @@ class JDBM2Client(store: JDBM2Store) {
gc.foreach { key=>
message_refs_db.remove(key)
messages_db.remove(key)
- if( zero_copy_buffer_allocator!=null ){
- val location = zerocp_db.find(key)
- if( location!=null ) {
- zero_copy_buffer_allocator.free(location._1, location._2, location._3)
- zerocp_db.remove(key)
- }
+ val location = lobs_db.find(key)
+ if( location!=null ) {
+ direct_buffer_allocator.free(location._1, location._2)
+ lobs_db.remove(key)
}
}
}
@@ -368,7 +358,7 @@ class JDBM2Client(store: JDBM2Store) {
def store(uows: Seq[JDBM2Store#DelayableUOW], callback:Runnable) {
transaction {
- var zcp_files_to_sync = Set[Int]()
+ var direct_buffer_sync = false
uows.foreach { uow =>
for((key,value) <- uow.map_actions) {
@@ -384,14 +374,13 @@ class JDBM2Client(store: JDBM2Store) {
val message_record = action.message_record
if (message_record != null) {
- val pb = if( message_record.zero_copy_buffer != null ) {
+ val pb = if( message_record.direct_buffer != null ) {
val r = to_pb(action.message_record).copy
- val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
- r.setZcpFile(buffer.file)
- r.setZcpOffset(buffer.offset)
- r.setZcpSize(buffer.size)
- zerocp_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
- zcp_files_to_sync += buffer.file
+ val buffer = direct_buffer_allocator.copy(message_record.direct_buffer)
+ r.setDirectOffset(buffer.offset)
+ r.setDirectSize(buffer.size)
+ lobs_db.put(message_record.key, (buffer.offset, buffer.size))
+ direct_buffer_sync = true
r.freeze
} else {
to_pb(action.message_record)
@@ -416,8 +405,8 @@ class JDBM2Client(store: JDBM2Store) {
}
}
- if( zero_copy_buffer_allocator!=null ) {
- zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+ if( direct_buffer_sync ) {
+ direct_buffer_allocator.sync
}
}
callback.run
@@ -489,8 +478,8 @@ class JDBM2Client(store: JDBM2Store) {
val record = metric_load_from_index_counter.time {
Option(messages_db.find(message_key)).map{ pb=>
val rc = from_pb(pb)
- if( pb.hasZcpFile ) {
- rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+ if( pb.hasDirectSize ) {
+ rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
}
rc
}
@@ -531,16 +520,10 @@ class JDBM2Client(store: JDBM2Store) {
}
streams.using_message_stream { message_stream=>
messages_db.cursor { (_, pb) =>
- if( pb.hasZcpFile ) {
- val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
- var data = pb.copy
- data.clearZcpFile
- data.clearZcpFile
- // write the pb frame and then the direct buffer data..
- data.freeze.writeFramed(message_stream)
- zcpb.read(message_stream)
- } else {
- pb.writeFramed(message_stream)
+ pb.writeFramed(message_stream)
+ if( pb.hasDirectSize ) {
+ val buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
+ buffer.read(message_stream)
}
true
}
@@ -608,27 +591,18 @@ class JDBM2Client(store: JDBM2Store) {
recman.commit
- var zcp_counter = 0
- val max_ctx = zero_copy_buffer_allocator.contexts.size
-
streams.using_message_stream { message_stream=>
foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
- val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
- val cp = pb.copy
- val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
- cp.setZcpFile(zcpb.file)
- cp.setZcpOffset(zcpb.offset)
-
- zcp_counter += 1
- zcpb.write(message_stream)
-
- zerocp_db.put(pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
- cp.freeze
- } else {
- pb
+ var record:MessagePB.Buffer = pb
+ if( pb.hasDirectSize ) {
+ val cp = record.copy
+ val buffer = direct_buffer_allocator.alloc(cp.getDirectSize)
+ cp.setDirectOffset(buffer.offset)
+ buffer.write(message_stream)
+ lobs_db.put(pb.getMessageKey, (buffer.offset, buffer.size))
+ record = cp.freeze
}
-
messages_db.put(record.getMessageKey, record)
check_flush(record.getSize, 1024*124*10)
}
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Mon Aug 29 17:43:03 2011
@@ -54,8 +54,6 @@ class JDBM2Store(var config:JDBM2StoreDT
protected def get_next_msg_key = next_msg_key.getAndIncrement
- override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
-
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
executor {
client.store(uows, ^{
Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/dto/JDBM2StoreDTO.java Mon Aug 29 17:43:03 2011
@@ -37,9 +37,6 @@ public class JDBM2StoreDTO extends Store
@XmlAttribute(name="compact_interval")
public Integer compact_interval;
- @XmlAttribute(name="zero_copy")
- public Boolean zero_copy;
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -51,7 +48,6 @@ public class JDBM2StoreDTO extends Store
if (compact_interval != null ? !compact_interval.equals(that.compact_interval) : that.compact_interval != null)
return false;
if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false;
- if (zero_copy != null ? !zero_copy.equals(that.zero_copy) : that.zero_copy != null) return false;
return true;
}
@@ -61,7 +57,6 @@ public class JDBM2StoreDTO extends Store
int result = super.hashCode();
result = 31 * result + (directory != null ? directory.hashCode() : 0);
result = 31 * result + (compact_interval != null ? compact_interval.hashCode() : 0);
- result = 31 * result + (zero_copy != null ? zero_copy.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Mon Aug 29 17:43:03 2011
@@ -30,7 +30,7 @@ import org.apache.activemq.apollo.transp
import _root_.org.fusesource.hawtbuf._
import Buffer._
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{ZeroCopyBuffer, ZeroCopyBufferAllocator, MessageRecord}
+import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
import org.apache.activemq.apollo.util.Log._
object StompCodec extends Log {
@@ -46,10 +46,10 @@ object StompCodec extends Log {
rc.expiration = message.expiration
if( frame.content.isInstanceOf[ZeroCopyContent] ) {
- rc.zero_copy_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
+ rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer
}
- def buffer_size = if (rc.zero_copy_buffer!=null) { frame.size - (rc.zero_copy_buffer.size - 1) } else { frame.size }
+ def buffer_size = if (rc.direct_buffer!=null) { frame.size - (rc.direct_buffer.size - 1) } else { frame.size }
val os = new ByteArrayOutputStream(buffer_size)
frame.action.writeTo(os)
@@ -82,7 +82,7 @@ object StompCodec extends Log {
os.write(NEWLINE)
}
os.write(NEWLINE)
- if ( rc.zero_copy_buffer==null ) {
+ if ( rc.direct_buffer==null ) {
frame.content.writeTo(os)
}
}
@@ -127,10 +127,10 @@ object StompCodec extends Log {
line = read_line
}
- if( message.zero_copy_buffer==null ) {
+ if( message.direct_buffer==null ) {
new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
} else {
- new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.zero_copy_buffer)))
+ new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.direct_buffer)))
}
}
@@ -143,7 +143,7 @@ class StompCodec extends ProtocolCodec {
var max_headers = 1000
var max_data_length = 1024 * 1024 * 100
- var zero_copy_buffer_allocator:ZeroCopyBufferAllocator = null
+ var direct_buffer_allocator:DirectBufferAllocator = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
@@ -164,10 +164,10 @@ class StompCodec extends ProtocolCodec {
var write_channel:WritableByteChannel = null
var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
- var next_write_direct:ZeroCopyBuffer = null
+ var next_write_direct:DirectBuffer = null
var write_buffer = ByteBuffer.allocate(0)
- var write_direct:ZeroCopyBuffer = null
+ var write_direct:DirectBuffer = null
var write_direct_pos = 0
var last_write_io_size = 0
@@ -313,7 +313,7 @@ class StompCodec extends ProtocolCodec {
var last_read_io_size = 0
- var read_direct:ZeroCopyBuffer = null
+ var read_direct:DirectBuffer = null
var read_direct_pos = 0
var next_action:FrameReader = read_action
@@ -480,9 +480,9 @@ class StompCodec extends ProtocolCodec {
// lets try to keep the content of big message outside of the JVM's garbage collection
// to keep the number of GCs down when moving big messages.
def is_message = action == SEND || action == MESSAGE
- if( length > 1024 && zero_copy_buffer_allocator!=null && is_message) {
+ if( length > 1024 && direct_buffer_allocator!=null && is_message) {
- read_direct = zero_copy_buffer_allocator.alloc(length)
+ read_direct = direct_buffer_allocator.alloc(length)
val dup = buffer.duplicate
dup.position(read_start)
@@ -529,7 +529,7 @@ class StompCodec extends ProtocolCodec {
null
}
- def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:ZeroCopyBuffer):FrameReader = (buffer)=> {
+ def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> {
if( read_frame_terminator(buffer, contentLength) ) {
next_action = read_action
new StompFrame(ascii(action), headers.toList, ZeroCopyContent(ma))
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Mon Aug 29 17:43:03 2011
@@ -21,7 +21,7 @@ import collection.mutable.ListBuffer
import java.lang.{String, Class}
import org.apache.activemq.apollo.broker._
import java.io.OutputStream
-import org.apache.activemq.apollo.broker.store.ZeroCopyBuffer
+import org.apache.activemq.apollo.broker.store.DirectBuffer
import org.apache.activemq.apollo.dto.DestinationDTO
/**
@@ -194,7 +194,7 @@ case class BufferContent(content:Buffer)
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class ZeroCopyContent(zero_copy_buffer:ZeroCopyBuffer) extends StompContent {
+case class ZeroCopyContent(zero_copy_buffer:DirectBuffer) extends StompContent {
def length = zero_copy_buffer.size-1
def writeTo(os:OutputStream) = {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Aug 29 17:43:03 2011
@@ -795,11 +795,7 @@ class StompProtocolHandler extends Proto
}
connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
-
- if( this.host.store!=null && this.host.store.zero_copy_buffer_allocator!=null ) {
- val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.zero_copy_buffer_allocator = this.host.store.zero_copy_buffer_allocator
- }
+ codec.direct_buffer_allocator = this.host.direct_buffer_allocator
}
reset {
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1162914&r1=1162913&r2=1162914&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Aug 29 17:43:03 2011
@@ -96,7 +96,7 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtdispatch-version>1.4</hawtdispatch-version>
+ <hawtdispatch-version>1.5-SNAPSHOT</hawtdispatch-version>
<hawtbuf-version>1.6</hawtbuf-version>
<jdbm-version>2.0.1</jdbm-version>