You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/01 14:09:44 UTC
svn commit: r1130122 - in /activemq/activemq-apollo/trunk: ./
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq...
Author: chirino
Date: Wed Jun 1 12:09:44 2011
New Revision: 1130122
URL: http://svn.apache.org/viewvc?rev=1130122&view=rev
Log:
BDB enhancements: Switch to only using 1 btree to track all queue entries, use var long keys to fit more records / page. Better error handling and retry logic.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Jun 1 12:09:44 2011
@@ -27,6 +27,9 @@ import org.apache.activemq.apollo.util._
import com.sleepycat.je._
import java.io.{EOFException, InputStream, OutputStream}
import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
+import org.apache.activemq.apollo.util.Log._
+import scala.Some
+import java.sql.ClientInfoStatus
object BDBClient extends Log
/**
@@ -89,7 +92,7 @@ class BDBClient(store: BDBStore) {
if( zero_copy_buffer_allocator!=null ) {
zerocp_db.cursor(tx) { (_,value)=>
- val v = encode_zcp_value(value)
+ val v = decode_zcp_value(value)
zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
true
}
@@ -116,6 +119,13 @@ class BDBClient(store: BDBStore) {
}
}
+ private var _entries_db:Database = _
+ def entries_db:Database = {
+ if( _entries_db==null ) {
+ _entries_db = environment.openDatabase(tx, "entries", long_long_key_conf)
+ }
+ _entries_db
+ }
private var _messages_db:Database = _
def messages_db:Database = {
@@ -159,6 +169,9 @@ class BDBClient(store: BDBStore) {
if( _queues_db!=null ) {
_queues_db.close
}
+ if( _entries_db!=null ) {
+ _entries_db.close
+ }
if(ok){
tx.commit
@@ -171,17 +184,41 @@ class BDBClient(store: BDBStore) {
def with_ctx[T](func: (TxContext) => T): T = {
- val ctx = TxContext(environment.beginTransaction(null, null));
- var ok = false
- try {
- val rc = func(ctx)
- ok = true
- rc
- } finally {
- ctx.close(ok)
+ var error:Throwable = null
+ var rc:Option[T] = None
+
+ // We will loop until the tx succeeds. Perhaps it's
+ // failing due to a temporary condition like low disk space.
+ while(!rc.isDefined) {
+
+
+ val ctx = TxContext(environment.beginTransaction(null, null));
+ try {
+ rc = Some(func(ctx))
+ } catch {
+ case e:Throwable =>
+ if( error==null ) {
+ warn(e, "Message store transaction failed. Will keep retrying after every second.")
+ }
+ error = e
+ } finally {
+ ctx.close(rc.isDefined)
+ }
+
+ if (!rc.isDefined) {
+ // We may need to give up if the store is being stopped.
+ if ( !store.service_state.is_started ) {
+ throw error
+ }
+ Thread.sleep(1000)
+ }
}
- }
+ if( error!=null ) {
+ info("Store recovered from inital failure.")
+ }
+ rc.get
+ }
def purge() = {
@@ -236,8 +273,6 @@ class BDBClient(store: BDBStore) {
with_ctx { ctx=>
import ctx._
queues_db.put(tx, record.key, record)
- with_entries_db(record.key) { entriesdb=>
- }
}
callback.run
}
@@ -261,17 +296,17 @@ class BDBClient(store: BDBStore) {
import ctx._
queues_db.delete(tx, queue_key)
- with_entries_db(queue_key) { entries_db=>
- entries_db.cursor(tx) { (key,value)=>
+ entries_db.cursor_from(tx, (queue_key, 0L)) { (key,value)=>
+ val current_key:(Long,Long)=key
+ if( current_key._1 == queue_key ) {
val queueEntry:QueueEntryRecord = value
decrement_message_reference(ctx, queueEntry.message_key)
true // keep cursoring..
+ } else {
+ false
}
-
}
-
- environment.removeDatabase(tx, entries_db_name(queue_key))
}
callback.run
}
@@ -305,17 +340,13 @@ class BDBClient(store: BDBStore) {
}
action.enqueues.foreach { queueEntry =>
- with_entries_db(queueEntry.queue_key) { entries_db=>
- entries_db.put(tx, queueEntry.entry_seq, queueEntry)
- add_and_get(message_refs_db, queueEntry.message_key, 1, tx)
- }
+ entries_db.put(tx, (queueEntry.queue_key, queueEntry.entry_seq), queueEntry)
+ add_and_get(message_refs_db, queueEntry.message_key, 1, tx)
}
action.dequeues.foreach { queueEntry =>
- with_entries_db(queueEntry.queue_key) { entries_db=>
- entries_db.delete(tx, queueEntry.entry_seq)
- decrement_message_reference(ctx, queueEntry.message_key)
- }
+ entries_db.delete(tx, (queueEntry.queue_key, queueEntry.entry_seq))
+ decrement_message_reference(ctx, queueEntry.message_key)
}
}
}
@@ -351,21 +382,19 @@ class BDBClient(store: BDBStore) {
var rc = ListBuffer[QueueEntryRange]()
with_ctx { ctx=>
import ctx._
+ var group:QueueEntryRange = null
- with_entries_db(queue_key) { entries_db=>
-
- var group:QueueEntryRange = null
-
- entries_db.cursor(tx) { (key, value) =>
-
+ entries_db.cursor_from(tx, (queue_key, 0L)) { (key, value) =>
+ val current_key:(Long,Long)= key
+ if( current_key._1 == queue_key ) {
if( group == null ) {
group = new QueueEntryRange
- group.first_entry_seq = key
+ group.first_entry_seq = current_key._2
}
val entry:QueueEntryRecord = value
- group.last_entry_seq = key
+ group.last_entry_seq = current_key._2
group.count += 1
group.size += entry.size
@@ -375,13 +404,16 @@ class BDBClient(store: BDBStore) {
}
true // to continue cursoring.
- }
- if( group!=null ) {
- rc += group
+ } else {
+ false
}
+ }
+ if( group!=null ) {
+ rc += group
}
+
}
rc
}
@@ -390,13 +422,17 @@ class BDBClient(store: BDBStore) {
var rc = ListBuffer[QueueEntryRecord]()
with_ctx { ctx=>
import ctx._
+ entries_db.cursor_from(tx, (queue_key, firstSeq)) { (key, value) =>
+ val current_key:(Long,Long) = key
+ if( current_key._1 == queue_key ) {
- with_entries_db(queue_key) { entries_db=>
- entries_db.cursor_from(tx, to_database_entry(firstSeq)) { (key, value) =>
- val entry_seq:Long = key
+ val entry_seq = current_key._2
val entry:QueueEntryRecord = value
rc += entry
entry_seq < lastSeq
+
+ } else {
+ false
}
}
}
@@ -406,11 +442,41 @@ class BDBClient(store: BDBStore) {
val metric_load_from_index_counter = new TimeCounter
var metric_load_from_index = metric_load_from_index_counter(false)
- def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
+ def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]):Unit = {
+
+ val missing = with_ctx { ctx=>
+ import ctx._
+ requests.flatMap { x =>
+ val (message_key, callback) = x
+ val record = metric_load_from_index_counter.time {
+ messages_db.get(tx, to_database_entry(message_key)).map{ data=>
+ import PBSupport._
+ val pb:MessagePB.Buffer = data
+ val rc = from_pb(pb)
+ if( pb.hasZcpFile ) {
+ rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+ }
+ rc
+ }
+ }
+ if( record.isDefined ) {
+ callback(record)
+ None
+ } else {
+ Some(x)
+ }
+ }
+ }
+
+ if (missing.isEmpty)
+ return
+ // There's a small chance that a message was missing, perhaps we started a read tx, before the
+ // write tx completed. Lets try again..
with_ctx { ctx=>
import ctx._
- requests.foreach { case (message_key, callback)=>
+ missing.foreach { x =>
+ val (message_key, callback) = x
val record = metric_load_from_index_counter.time {
messages_db.get(tx, to_database_entry(message_key)).map{ data=>
import PBSupport._
@@ -478,15 +544,9 @@ class BDBClient(store: BDBStore) {
}
streams.using_queue_entry_stream { queue_entry_stream=>
- queues_db.cursor(tx) { (_, value) =>
- val record:QueueRecord = value
- with_entries_db(record.key) { entries_db=>
- entries_db.cursor(tx) { (key, value) =>
- val record:QueueEntryRecord = value
- record.writeFramed(queue_entry_stream)
- true
- }
- }
+ entries_db.cursor(tx) { (key, value) =>
+ val record:QueueEntryRecord = value
+ record.writeFramed(queue_entry_stream)
true
}
}
@@ -523,8 +583,6 @@ class BDBClient(store: BDBStore) {
foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=>
val record:QueueRecord = pb
queues_db.put(tx, record.key, record)
- with_entries_db(record.key) { entriesdb=>
- }
}
}
@@ -557,10 +615,8 @@ class BDBClient(store: BDBStore) {
foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
val record:QueueEntryRecord = pb
- with_entries_db(record.queue_key) { entries_db=>
- entries_db.put(tx, record.entry_seq, record)
- add_and_get(message_refs_db, record.message_key, 1, tx)
- }
+ entries_db.put(tx, (record.queue_key, record.entry_seq), record)
+ add_and_get(message_refs_db, record.message_key, 1, tx)
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Wed Jun 1 12:09:44 2011
@@ -20,9 +20,9 @@ import java.util.Comparator
import java.nio.ByteBuffer
import com.sleepycat.je._
import java.io.Serializable
-import org.fusesource.hawtbuf.Buffer
import org.apache.activemq.apollo.broker.store._
import PBSupport._
+import org.fusesource.hawtbuf._
object HelperTrait {
@@ -35,26 +35,64 @@ object HelperTrait {
implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
- implicit def encode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
- val e = new Buffer(entry.getData).bigEndianEditor()
- (e.readInt, e.readLong, e.readInt)
- }
- implicit def decode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
- val buf = new Buffer(16)
- val e = buf.bigEndianEditor()
- e.writeInt(v._1)
- e.writeLong(v._2)
- e.writeInt(v._1)
- new DatabaseEntry(buf.data)
+ implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+ val in = new DataByteArrayInputStream(entry.getData)
+ (in.readVarInt(), in.readVarLong(), in.readVarInt())
+ }
+ implicit def encode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarIntSize(v._1) +
+ AbstractVarIntSupport.computeVarLongSize(v._2) +
+ AbstractVarIntSupport.computeVarIntSize(v._3)
+ )
+ out.writeVarInt(v._1)
+ out.writeVarLong(v._2)
+ out.writeVarInt(v._3)
+ new DatabaseEntry(out.toBuffer.data)
+ }
+
+ implicit def to_bytes(l:Long):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(AbstractVarIntSupport.computeVarLongSize(l))
+ out.writeVarLong(l)
+ out.toBuffer.data
+ }
+ implicit def to_long(bytes:Array[Byte]):Long = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readVarLong()
}
- implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new Array[Byte](8)).putLong(l).array()
- implicit def to_long(bytes:Array[Byte]):Long = ByteBuffer.wrap(bytes).getLong()
implicit def to_database_entry(l:Long):DatabaseEntry = new DatabaseEntry(to_bytes(l))
implicit def to_long(value:DatabaseEntry):Long = to_long(value.getData)
- implicit def to_bytes(l:Int):Array[Byte] = ByteBuffer.wrap(new Array[Byte](4)).putInt(l).array()
- implicit def to_int(bytes:Array[Byte]):Int = ByteBuffer.wrap(bytes).getInt()
+ implicit def to_bytes(l:(Long, Long)):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(
+ AbstractVarIntSupport.computeVarLongSize(l._1)+
+ AbstractVarIntSupport.computeVarLongSize(l._2)
+ )
+ out.writeVarLong(l._1)
+ out.writeVarLong(l._2)
+ out.toBuffer.data
+ }
+
+ implicit def to_long_long(bytes:Array[Byte]):(Long,Long) = {
+ val in = new DataByteArrayInputStream(bytes)
+ (in.readVarLong(), in.readVarLong())
+ }
+
+ implicit def to_database_entry(l:(Long,Long)):DatabaseEntry = new DatabaseEntry(to_bytes(l))
+ implicit def to_long_long(value:DatabaseEntry):(Long,Long) = to_long_long(value.getData)
+
+ implicit def to_bytes(l:Int):Array[Byte] = {
+ val out = new DataByteArrayOutputStream(AbstractVarIntSupport.computeVarIntSize(l))
+ out.writeVarInt(l)
+ out.toBuffer.data
+ }
+
+ implicit def to_int(bytes:Array[Byte]):Int = {
+ val in = new DataByteArrayInputStream(bytes)
+ in.readVarInt()
+ }
+
implicit def to_database_entry(l:Int):DatabaseEntry = new DatabaseEntry(to_bytes(l))
implicit def to_int(value:DatabaseEntry):Int = to_int(value.getData)
@@ -63,18 +101,39 @@ object HelperTrait {
class LongComparator extends Comparator[Array[Byte]] with Serializable {
def compare(o1: Array[Byte], o2: Array[Byte]) = {
- val v1:java.lang.Long = to_long(o1)
- val v2:java.lang.Long = to_long(o2)
+ val v1 = to_long(o1)
+ val v2 = to_long(o2)
v1.compareTo(v2)
}
}
+ @SerialVersionUID(2)
+ class LongLongComparator extends Comparator[Array[Byte]] with Serializable {
+
+ def compare(o1: Array[Byte], o2: Array[Byte]) = {
+ val v1 = to_long_long(o1)
+ val v2 = to_long_long(o2)
+ val rc = v1._1.compareTo(v2._1)
+ if (rc==0) {
+ v1._2.compareTo(v2._2)
+ } else {
+ rc
+ }
+ }
+
+ }
+
val long_key_conf = new DatabaseConfig();
long_key_conf.setAllowCreate(true)
long_key_conf.setTransactional(true);
long_key_conf.setBtreeComparator(new LongComparator)
+ val long_long_key_conf = new DatabaseConfig();
+ long_long_key_conf.setAllowCreate(true)
+ long_long_key_conf.setTransactional(true);
+ long_long_key_conf.setBtreeComparator(new LongLongComparator)
+
final class RichDatabase(val db: Database) extends Proxy {
def self: Any = db
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Wed Jun 1 12:09:44 2011
@@ -28,7 +28,7 @@ class BDBStoreTest extends StoreFunSuite
def create_store(flushDelay:Long):Store = {
val rc = new BDBStore({
val rc = new BDBStoreDTO
- rc.directory = basedir / "activemq-data"
+ rc.directory = basedir / "target" / "apollo-data"
rc
})
rc.config.flush_delay = flushDelay
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala Wed Jun 1 12:09:44 2011
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.broker.store
-;
-
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Jun 1 12:09:44 2011
@@ -97,7 +97,7 @@
<felix-version>1.0.0</felix-version>
<hawtdispatch-version>1.3-SNAPSHOT</hawtdispatch-version>
- <hawtbuf-version>1.4</hawtbuf-version>
+ <hawtbuf-version>1.5-SNAPSHOT</hawtbuf-version>
<jdbm-version>2.0.1</jdbm-version>
<bdb-version>4.1.6</bdb-version>