You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/01 14:09:44 UTC

svn commit: r1130122 - in /activemq/activemq-apollo/trunk: ./ apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq...

Author: chirino
Date: Wed Jun  1 12:09:44 2011
New Revision: 1130122

URL: http://svn.apache.org/viewvc?rev=1130122&view=rev
Log:
BDB enhancements: Switch to only using 1 btree to track all queue entries, use var long keys to fit more records / page.  Better error handling and retry logic.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Jun  1 12:09:44 2011
@@ -27,6 +27,9 @@ import org.apache.activemq.apollo.util._
 import com.sleepycat.je._
 import java.io.{EOFException, InputStream, OutputStream}
 import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
+import org.apache.activemq.apollo.util.Log._
+import scala.Some
+import java.sql.ClientInfoStatus
 
 object BDBClient extends Log
 /**
@@ -89,7 +92,7 @@ class BDBClient(store: BDBStore) {
 
       if( zero_copy_buffer_allocator!=null ) {
         zerocp_db.cursor(tx) { (_,value)=>
-          val v = encode_zcp_value(value)
+          val v = decode_zcp_value(value)
           zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
           true
         }
@@ -116,6 +119,13 @@ class BDBClient(store: BDBStore) {
       }
     }
 
+    private var _entries_db:Database = _
+    def entries_db:Database = {
+      if( _entries_db==null ) {
+        _entries_db = environment.openDatabase(tx, "entries", long_long_key_conf)
+      }
+      _entries_db
+    }
 
     private var _messages_db:Database = _
     def messages_db:Database = {
@@ -159,6 +169,9 @@ class BDBClient(store: BDBStore) {
       if( _queues_db!=null ) {
         _queues_db.close
       }
+      if( _entries_db!=null ) {
+        _entries_db.close
+      }
 
       if(ok){
         tx.commit
@@ -171,17 +184,41 @@ class BDBClient(store: BDBStore) {
 
 
   def with_ctx[T](func: (TxContext) => T): T = {
-    val ctx = TxContext(environment.beginTransaction(null, null));
-    var ok = false
-    try {
-      val rc = func(ctx)
-      ok = true
-      rc
-    } finally {
-      ctx.close(ok)
+    var error:Throwable = null
+    var rc:Option[T] = None
+
+    // We will loop until the tx succeeds.  Perhaps it's
+    // failing due to a temporary condition like low disk space.
+    while(!rc.isDefined) {
+
+
+      val ctx = TxContext(environment.beginTransaction(null, null));
+      try {
+        rc = Some(func(ctx))
+      } catch {
+        case e:Throwable =>
+          if( error==null ) {
+            warn(e, "Message store transaction failed. Will keep retrying after every second.")
+          }
+          error = e
+      } finally {
+        ctx.close(rc.isDefined)
+      }
+
+      if (!rc.isDefined) {
+        // We may need to give up if the store is being stopped.
+        if ( !store.service_state.is_started ) {
+          throw error
+        }
+        Thread.sleep(1000)
+      }
     }
-  }
 
+    if( error!=null ) {
+      info("Store recovered from inital failure.")
+    }
+    rc.get
+  }
 
   def purge() = {
 
@@ -236,8 +273,6 @@ class BDBClient(store: BDBStore) {
     with_ctx { ctx=>
       import ctx._
       queues_db.put(tx, record.key, record)
-      with_entries_db(record.key) { entriesdb=> 
-      }
     }
     callback.run
   }
@@ -261,17 +296,17 @@ class BDBClient(store: BDBStore) {
       import ctx._
 
       queues_db.delete(tx, queue_key)
-      with_entries_db(queue_key) { entries_db=>
 
-        entries_db.cursor(tx) { (key,value)=>
+      entries_db.cursor_from(tx, (queue_key, 0L)) { (key,value)=>
+        val current_key:(Long,Long)=key
+        if( current_key._1 == queue_key ) {
           val queueEntry:QueueEntryRecord = value
           decrement_message_reference(ctx, queueEntry.message_key)
           true // keep cursoring..
+        } else {
+          false
         }
-
       }
-
-      environment.removeDatabase(tx, entries_db_name(queue_key))
     }
     callback.run
   }
@@ -305,17 +340,13 @@ class BDBClient(store: BDBStore) {
               }
 
               action.enqueues.foreach { queueEntry =>
-                with_entries_db(queueEntry.queue_key) { entries_db=>
-                  entries_db.put(tx, queueEntry.entry_seq, queueEntry)
-                  add_and_get(message_refs_db, queueEntry.message_key, 1, tx)
-                }
+                entries_db.put(tx, (queueEntry.queue_key, queueEntry.entry_seq), queueEntry)
+                add_and_get(message_refs_db, queueEntry.message_key, 1, tx)
               }
 
               action.dequeues.foreach { queueEntry =>
-                with_entries_db(queueEntry.queue_key) { entries_db=>
-                  entries_db.delete(tx, queueEntry.entry_seq)
-                  decrement_message_reference(ctx, queueEntry.message_key)
-                }
+                entries_db.delete(tx, (queueEntry.queue_key, queueEntry.entry_seq))
+                decrement_message_reference(ctx, queueEntry.message_key)
               }
           }
       }
@@ -351,21 +382,19 @@ class BDBClient(store: BDBStore) {
     var rc = ListBuffer[QueueEntryRange]()
     with_ctx { ctx=>
       import ctx._
+      var group:QueueEntryRange = null
 
-      with_entries_db(queue_key) { entries_db=>
-
-        var group:QueueEntryRange = null
-
-        entries_db.cursor(tx) { (key, value) =>
-
+      entries_db.cursor_from(tx, (queue_key, 0L)) { (key, value) =>
+        val current_key:(Long,Long)= key
+        if( current_key._1 == queue_key ) {
           if( group == null ) {
             group = new QueueEntryRange
-            group.first_entry_seq = key
+            group.first_entry_seq = current_key._2
           }
 
           val entry:QueueEntryRecord = value
 
-          group.last_entry_seq = key
+          group.last_entry_seq = current_key._2
           group.count += 1
           group.size += entry.size
 
@@ -375,13 +404,16 @@ class BDBClient(store: BDBStore) {
           }
 
           true // to continue cursoring.
-        }
 
-        if( group!=null ) {
-          rc += group
+        } else {
+          false
         }
+      }
 
+      if( group!=null ) {
+        rc += group
       }
+
     }
     rc
   }
@@ -390,13 +422,17 @@ class BDBClient(store: BDBStore) {
     var rc = ListBuffer[QueueEntryRecord]()
     with_ctx { ctx=>
       import ctx._
+      entries_db.cursor_from(tx, (queue_key, firstSeq)) { (key, value) =>
+        val current_key:(Long,Long) = key
+        if( current_key._1 == queue_key ) {
 
-      with_entries_db(queue_key) { entries_db=>
-        entries_db.cursor_from(tx, to_database_entry(firstSeq)) { (key, value) =>
-          val entry_seq:Long = key
+          val entry_seq = current_key._2
           val entry:QueueEntryRecord = value
           rc += entry
           entry_seq < lastSeq
+
+        } else {
+          false
         }
       }
     }
@@ -406,11 +442,41 @@ class BDBClient(store: BDBStore) {
   val metric_load_from_index_counter = new TimeCounter
   var metric_load_from_index = metric_load_from_index_counter(false)
 
-  def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
+  def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]):Unit = {
+
+    val missing = with_ctx { ctx=>
+      import ctx._
+      requests.flatMap { x =>
+        val (message_key, callback) = x
+        val record = metric_load_from_index_counter.time {
+          messages_db.get(tx, to_database_entry(message_key)).map{ data=>
+            import PBSupport._
+            val pb:MessagePB.Buffer = data
+            val rc = from_pb(pb)
+            if( pb.hasZcpFile ) {
+              rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset, pb.getZcpSize)
+            }
+            rc
+          }
+        }
+        if( record.isDefined ) {
+          callback(record)
+          None
+        } else {
+          Some(x)
+        }
+      }
+    }
+
+    if (missing.isEmpty)
+      return
 
+    // There's a small chance that a message was missing, perhaps we started a read tx, before the
+    // write tx completed.  Lets try again..
     with_ctx { ctx=>
       import ctx._
-      requests.foreach { case (message_key, callback)=>
+      missing.foreach { x =>
+        val (message_key, callback) = x
         val record = metric_load_from_index_counter.time {
           messages_db.get(tx, to_database_entry(message_key)).map{ data=>
             import PBSupport._
@@ -478,15 +544,9 @@ class BDBClient(store: BDBStore) {
         }
 
         streams.using_queue_entry_stream { queue_entry_stream=>
-          queues_db.cursor(tx) { (_, value) =>
-            val record:QueueRecord = value
-            with_entries_db(record.key) { entries_db=>
-              entries_db.cursor(tx) { (key, value) =>
-                val record:QueueEntryRecord = value
-                record.writeFramed(queue_entry_stream)
-                true
-              }
-            }
+          entries_db.cursor(tx) { (key, value) =>
+            val record:QueueEntryRecord = value
+            record.writeFramed(queue_entry_stream)
             true
           }
         }
@@ -523,8 +583,6 @@ class BDBClient(store: BDBStore) {
           foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=>
             val record:QueueRecord = pb
             queues_db.put(tx, record.key, record)
-            with_entries_db(record.key) { entriesdb=>
-            }
           }
         }
 
@@ -557,10 +615,8 @@ class BDBClient(store: BDBStore) {
           foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
             val record:QueueEntryRecord = pb
 
-            with_entries_db(record.queue_key) { entries_db=>
-              entries_db.put(tx, record.entry_seq, record)
-              add_and_get(message_refs_db, record.message_key, 1, tx)
-            }
+            entries_db.put(tx, (record.queue_key, record.entry_seq), record)
+            add_and_get(message_refs_db, record.message_key, 1, tx)
           }
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Wed Jun  1 12:09:44 2011
@@ -20,9 +20,9 @@ import java.util.Comparator
 import java.nio.ByteBuffer
 import com.sleepycat.je._
 import java.io.Serializable
-import org.fusesource.hawtbuf.Buffer
 import org.apache.activemq.apollo.broker.store._
 import PBSupport._
+import org.fusesource.hawtbuf._
 
 object HelperTrait {
 
@@ -35,26 +35,64 @@ object HelperTrait {
   implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
   implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
 
-  implicit def encode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
-    val e = new Buffer(entry.getData).bigEndianEditor()
-    (e.readInt, e.readLong, e.readInt)
-  }
-  implicit def decode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
-    val buf = new Buffer(16)
-    val e = buf.bigEndianEditor()
-    e.writeInt(v._1)
-    e.writeLong(v._2)
-    e.writeInt(v._1)
-    new DatabaseEntry(buf.data)
+  implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+    val in = new DataByteArrayInputStream(entry.getData)
+    (in.readVarInt(), in.readVarLong(), in.readVarInt())
+  }
+  implicit def encode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarIntSize(v._1) +
+      AbstractVarIntSupport.computeVarLongSize(v._2) +
+      AbstractVarIntSupport.computeVarIntSize(v._3)
+    )
+    out.writeVarInt(v._1)
+    out.writeVarLong(v._2)
+    out.writeVarInt(v._3)
+    new DatabaseEntry(out.toBuffer.data)
+  }
+
+  implicit def to_bytes(l:Long):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(AbstractVarIntSupport.computeVarLongSize(l))
+    out.writeVarLong(l)
+    out.toBuffer.data
+  }
+  implicit def to_long(bytes:Array[Byte]):Long = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarLong()
   }
 
-  implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new Array[Byte](8)).putLong(l).array()
-  implicit def to_long(bytes:Array[Byte]):Long = ByteBuffer.wrap(bytes).getLong()
   implicit def to_database_entry(l:Long):DatabaseEntry = new DatabaseEntry(to_bytes(l))
   implicit def to_long(value:DatabaseEntry):Long = to_long(value.getData)
 
-  implicit def to_bytes(l:Int):Array[Byte] = ByteBuffer.wrap(new Array[Byte](4)).putInt(l).array()
-  implicit def to_int(bytes:Array[Byte]):Int = ByteBuffer.wrap(bytes).getInt()
+  implicit def to_bytes(l:(Long, Long)):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(
+      AbstractVarIntSupport.computeVarLongSize(l._1)+
+      AbstractVarIntSupport.computeVarLongSize(l._2)
+    )
+    out.writeVarLong(l._1)
+    out.writeVarLong(l._2)
+    out.toBuffer.data
+  }
+
+  implicit def to_long_long(bytes:Array[Byte]):(Long,Long) = {
+    val in = new DataByteArrayInputStream(bytes)
+    (in.readVarLong(), in.readVarLong())
+  }
+
+  implicit def to_database_entry(l:(Long,Long)):DatabaseEntry = new DatabaseEntry(to_bytes(l))
+  implicit def to_long_long(value:DatabaseEntry):(Long,Long) = to_long_long(value.getData)
+
+  implicit def to_bytes(l:Int):Array[Byte] = {
+    val out = new DataByteArrayOutputStream(AbstractVarIntSupport.computeVarIntSize(l))
+    out.writeVarInt(l)
+    out.toBuffer.data
+  }
+
+  implicit def to_int(bytes:Array[Byte]):Int = {
+    val in = new DataByteArrayInputStream(bytes)
+    in.readVarInt()
+  }
+
   implicit def to_database_entry(l:Int):DatabaseEntry = new DatabaseEntry(to_bytes(l))
   implicit def to_int(value:DatabaseEntry):Int = to_int(value.getData)
 
@@ -63,18 +101,39 @@ object HelperTrait {
   class LongComparator extends Comparator[Array[Byte]] with Serializable {
 
     def compare(o1: Array[Byte], o2: Array[Byte]) = {
-        val v1:java.lang.Long = to_long(o1)
-        val v2:java.lang.Long = to_long(o2)
+        val v1 = to_long(o1)
+        val v2 = to_long(o2)
         v1.compareTo(v2)
     }
     
   }
 
+  @SerialVersionUID(2)
+  class LongLongComparator extends Comparator[Array[Byte]] with Serializable {
+
+    def compare(o1: Array[Byte], o2: Array[Byte]) = {
+      val v1 = to_long_long(o1)
+      val v2 = to_long_long(o2)
+      val rc = v1._1.compareTo(v2._1)
+      if (rc==0) {
+        v1._2.compareTo(v2._2)
+      } else {
+        rc
+      }
+    }
+
+  }
+
   val long_key_conf = new DatabaseConfig();
   long_key_conf.setAllowCreate(true)
   long_key_conf.setTransactional(true);
   long_key_conf.setBtreeComparator(new LongComparator)
 
+  val long_long_key_conf = new DatabaseConfig();
+  long_long_key_conf.setAllowCreate(true)
+  long_long_key_conf.setTransactional(true);
+  long_long_key_conf.setBtreeComparator(new LongLongComparator)
+
   final class RichDatabase(val db: Database) extends Proxy {
     def self: Any = db
 

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Wed Jun  1 12:09:44 2011
@@ -28,7 +28,7 @@ class BDBStoreTest extends StoreFunSuite
   def create_store(flushDelay:Long):Store = {
     val rc = new BDBStore({
       val rc = new BDBStoreDTO
-      rc.directory = basedir / "activemq-data"
+      rc.directory = basedir / "target" / "apollo-data"
       rc
     })
     rc.config.flush_delay = flushDelay

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala Wed Jun  1 12:09:44 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.broker.store
 
-;
-
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1130122&r1=1130121&r2=1130122&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Jun  1 12:09:44 2011
@@ -97,7 +97,7 @@
     <felix-version>1.0.0</felix-version>
 
     <hawtdispatch-version>1.3-SNAPSHOT</hawtdispatch-version>
-    <hawtbuf-version>1.4</hawtbuf-version>
+    <hawtbuf-version>1.5-SNAPSHOT</hawtbuf-version>
     
     <jdbm-version>2.0.1</jdbm-version>
     <bdb-version>4.1.6</bdb-version>