You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/10 19:51:08 UTC
svn commit: r1156274 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Author: chirino
Date: Wed Aug 10 17:51:07 2011
New Revision: 1156274
URL: http://svn.apache.org/viewvc?rev=1156274&view=rev
Log:
Extend the Store interface so that the store can avoid disk syncs if they are not required.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Wed Aug 10 17:51:07 2011
@@ -24,12 +24,13 @@ import java.util.concurrent.atomic.Atomi
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
-import com.sleepycat.je._
import java.io.{EOFException, InputStream, OutputStream}
import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
import org.apache.activemq.apollo.util.Log._
import scala.Some
import java.sql.ClientInfoStatus
+import com.sleepycat.je._
+import javax.management.remote.rmi._RMIConnection_Stub
object BDBClient extends Log
/**
@@ -84,7 +85,7 @@ class BDBClient(store: BDBStore) {
environment = new Environment(directory, env_config);
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
messages_db
message_refs_db
@@ -183,7 +184,7 @@ class BDBClient(store: BDBStore) {
}
- def with_ctx[T](func: (TxContext) => T): T = {
+ def with_ctx[T](sync:Boolean=true)(func: (TxContext) => T): T = {
var error:Throwable = null
var rc:Option[T] = None
@@ -192,7 +193,12 @@ class BDBClient(store: BDBStore) {
while(!rc.isDefined) {
- val ctx = TxContext(environment.beginTransaction(null, null));
+ val ctx = if(sync) {
+ TxContext(environment.beginTransaction(null, null));
+ } else {
+ TxContext(environment.beginTransaction(null, new TransactionConfig().setDurability(Durability.COMMIT_NO_SYNC)))
+ }
+
try {
rc = Some(func(ctx))
} catch {
@@ -222,7 +228,7 @@ class BDBClient(store: BDBStore) {
def purge() = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
@@ -270,7 +276,7 @@ class BDBClient(store: BDBStore) {
}
def addQueue(record: QueueRecord, callback:Runnable) = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
queues_db.put(tx, record.key, record)
}
@@ -292,7 +298,7 @@ class BDBClient(store: BDBStore) {
}
def removeQueue(queue_key: Long, callback:Runnable) = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
queues_db.delete(tx, queue_key)
@@ -313,7 +319,8 @@ class BDBClient(store: BDBStore) {
}
def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) {
- with_ctx { ctx=>
+ val sync = uows.find( ! _.complete_listeners.isEmpty ).isDefined
+ with_ctx(sync) { ctx=>
import ctx._
var zcp_files_to_sync = Set[Int]()
uows.foreach { uow =>
@@ -360,7 +367,7 @@ class BDBClient(store: BDBStore) {
def listQueues: Seq[Long] = {
val rc = ListBuffer[Long]()
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
queues_db.cursor(tx) { (key, _) =>
@@ -373,7 +380,7 @@ class BDBClient(store: BDBStore) {
}
def getQueue(queue_key: Long): Option[QueueRecord] = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
queues_db.get(tx, to_database_entry(queue_key)).map( x=> to_queue_record(x) )
}
@@ -381,7 +388,7 @@ class BDBClient(store: BDBStore) {
def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
var rc = ListBuffer[QueueEntryRange]()
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
var group:QueueEntryRange = null
@@ -429,7 +436,7 @@ class BDBClient(store: BDBStore) {
def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
var rc = ListBuffer[QueueEntryRecord]()
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
entries_db.cursor_from(tx, (queue_key, firstSeq)) { (key, value) =>
val current_key:(Long,Long) = key
@@ -453,7 +460,7 @@ class BDBClient(store: BDBStore) {
def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]):Unit = {
- val missing = with_ctx { ctx=>
+ val missing = with_ctx() { ctx=>
import ctx._
requests.flatMap { x =>
val (message_key, callback) = x
@@ -482,7 +489,7 @@ class BDBClient(store: BDBStore) {
// There's a small chance that a message was missing, perhaps we started a read tx, before the
// write tx completed. Lets try again..
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
missing.foreach { x =>
val (message_key, callback) = x
@@ -504,7 +511,7 @@ class BDBClient(store: BDBStore) {
def getLastMessageKey:Long = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
messages_db.last_key(tx).map(to_long _).getOrElse(0)
@@ -512,7 +519,7 @@ class BDBClient(store: BDBStore) {
}
def getLastQueueKey:Long = {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
queues_db.last_key(tx).map(to_long _).getOrElse(0)
@@ -521,7 +528,7 @@ class BDBClient(store: BDBStore) {
def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
try {
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
import PBSupport._
@@ -584,7 +591,7 @@ class BDBClient(store: BDBStore) {
} while( !done )
}
- with_ctx { ctx=>
+ with_ctx() { ctx=>
import ctx._
import PBSupport._
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Aug 10 17:51:07 2011
@@ -1113,7 +1113,7 @@ class QueueEntry(val queue:Queue, val se
if(!storing) {
storing = true
delivery.uow.enqueue(toQueueEntryRecord)
- delivery.uow.on_complete {
+ delivery.uow.on_flush {
queue.swap_out_completes_source.merge(this)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Wed Aug 10 17:51:07 2011
@@ -62,7 +62,6 @@ trait DelayingStoreSupport extends Store
var dispose_start:Long = 0
var flushing = false;
- var status = "init"
class MessageAction {
@@ -84,8 +83,23 @@ trait DelayingStoreSupport extends Store
var completed = false
var complete_listeners = ListBuffer[() => Unit]()
+ var flushed = false
+ var flush_listeners = ListBuffer[() => Unit]()
var disable_delay = false
+ def on_flush(callback: =>Unit) = {
+ if( this.synchronized {
+ if( flushed ) {
+ true
+ } else {
+ flush_listeners += ( ()=> callback )
+ false
+ }
+ }) {
+ callback
+ }
+ }
+
def on_complete(callback: =>Unit) = {
if( this.synchronized {
if( completed ) {
@@ -113,10 +127,10 @@ trait DelayingStoreSupport extends Store
}
def cancel = {
- status += "|cancel"
+ dispatch_queue.assertExecuting()
flushing = true
delayed_uows.remove(uow_id)
- on_performed
+ on_completed
}
def store(record: MessageRecord):Long = {
@@ -165,16 +179,25 @@ trait DelayingStoreSupport extends Store
}
override def dispose = {
- status += "|commited"
dispose_start = System.nanoTime
uow_source.merge(this)
}
- def on_performed() = this.synchronized {
- status += "|performed"
- commit_latency_counter += System.nanoTime-dispose_start
- complete_listeners.foreach(_())
- super.dispose
+ def on_flushed() = this.synchronized {
+ if( !flushed ) {
+ flushed = true
+ flush_listeners.foreach(_())
+ }
+ }
+
+ def on_completed() = this.synchronized {
+ if ( !completed ) {
+ on_flushed
+ completed = true
+ commit_latency_counter += System.nanoTime-dispose_start
+ complete_listeners.foreach(_())
+ super.dispose
+ }
}
}
@@ -189,7 +212,6 @@ trait DelayingStoreSupport extends Store
pending_stores.get(message_key) match {
case null => cb()
case action =>
- action.uow.status += "|flush_message"
action.uow.on_complete( cb() )
flush(action.uow)
}
@@ -279,7 +301,6 @@ trait DelayingStoreSupport extends Store
dispatch_queue.assertExecuting()
uow_source.getData.foreach { uow =>
- uow.status += "|delayed"
delayed_uows.put(uow.uow_id, uow)
uow.actions.foreach { case (msg, action) =>
@@ -341,7 +362,6 @@ trait DelayingStoreSupport extends Store
if( uow!=null && !uow.flushing ) {
uow.flushing = true
delayed_uows.remove(uow.uow_id)
- uow.status += "|flushing"
flush_source.merge(uow)
}
}
@@ -365,11 +385,11 @@ trait DelayingStoreSupport extends Store
flush_latency_counter.start { end=>
flush_source.suspend
store(uows) {
- dispatch_queue.assertExecuting()
+ store_completed(uows)
+
flush_source.resume
- end()
+ dispatch_queue.assertExecuting()
uows.foreach { uow=>
- uow.status += "|flushed"
uow.actions.foreach { case (msg, action) =>
if( action.message_record !=null ) {
metric_flushed_message_counter += 1
@@ -377,16 +397,21 @@ trait DelayingStoreSupport extends Store
}
action.enqueues.foreach { queue_entry=>
metric_flushed_enqueue_counter += 1
- val k = key(queue_entry)
- pending_enqueues.remove(k)
+ pending_enqueues.remove(key(queue_entry))
}
}
- uow.on_performed
-
}
+ end()
}
}
}
}
+ def store_completed(uows: ListBuffer[DelayingStoreSupport.this.type#DelayableUOW]) {
+ uows.foreach { uow =>
+ uow.on_completed
+ }
+ }
+
+
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1156274&r1=1156273&r2=1156274&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Wed Aug 10 17:51:07 2011
@@ -67,7 +67,15 @@ trait StoreUOW extends Retained {
/**
* The specified callback is executed once the UOW
- * is completed.
+ * has written to disk and flushed of the application
+ * buffers.
+ */
+ def on_flush(callback: =>Unit)
+
+ /**
+ * The specified callback is executed once the UOW
+ * has fully completed, that is it's been flushed and
+ * and synced to disk.
*/
def on_complete(callback: =>Unit)