You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/31 17:30:59 UTC
svn commit: r1508925 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Author: chirino
Date: Wed Jul 31 15:30:59 2013
New Revision: 1508925
URL: http://svn.apache.org/r1508925
Log:
Have the stomp protocol keep better track of in progress commits to aid in debugging.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1508925&r1=1508924&r2=1508925&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jul 31 15:30:59 2013
@@ -1212,9 +1212,14 @@ class StompProtocolHandler extends Proto
case None=>
perform_send(frame)
case Some(txid)=>
- get_or_create_tx_queue(txid).add { uow=>
- perform_send(frame, uow)
- }
+ get_or_create_tx_queue(txid).add (new TransactionAction(){
+ override def on_commit(uow: StoreUOW) {
+ perform_send(frame, uow)
+ }
+ override def toString: String = {
+ "send to: "+dest
+ }
+ })
}
}
@@ -1715,11 +1720,17 @@ class StompProtocolHandler extends Proto
handler.perform_ack(consumed, messageId, null)
case Some(txid)=>
handler.consumer.retain()
- get_or_create_tx_queue(txid).add({ uow=>
- handler.perform_ack(consumed, messageId, uow)
- handler.consumer.release()
- }, ()=>{
- handler.consumer.release()
+ get_or_create_tx_queue(txid).add (new TransactionAction(){
+ override def on_commit(uow: StoreUOW) {
+ handler.perform_ack(consumed, messageId, uow)
+ handler.consumer.release()
+ }
+ override def on_rollback() {
+ handler.consumer.release()
+ }
+ override def toString: String = {
+ "ack: "+messageId
+ }
})
}
}
@@ -1737,7 +1748,10 @@ class StompProtocolHandler extends Proto
}
def on_stomp_commit(headers:HeaderMap) = {
- remove_tx_queue(require_transaction_header(headers)).commit {
+ val txid = require_transaction_header(headers)
+ val tx = transactions.get(txid).getOrElse(die("transaction not active: %d".format(txid)))
+ tx.commit {
+ remove_tx_queue(txid)
send_receipt(headers)
}
}
@@ -1767,25 +1781,29 @@ class StompProtocolHandler extends Proto
frame
}
+ class TransactionAction {
+ def on_commit(uow:StoreUOW):Unit = {}
+ def on_rollback() = {}
+ }
class TransactionQueue {
// TODO: eventually we want to back this /w a broker Queue which
// can provides persistence and memory swapping.
- val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
-
+ val queue = ListBuffer[TransactionAction]()
+ var uow:StoreUOW = _
override def toString: String = {
- "{ actions: "+queue.size+" }"
+ "{ uow: "+uow+", actions: "+queue+" }"
}
- def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
- queue += ((on_commit, on_rollback))
+ def add(action:TransactionAction):Unit = {
+ queue += action
}
def commit(on_complete: => Unit) = {
if( host.store!=null ) {
- val uow = host.store.create_uow
+ uow = host.store.create_uow
// println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
uow.on_complete {
// println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
@@ -1793,20 +1811,16 @@ class StompProtocolHandler extends Proto
on_complete
}
}
- queue.foreach{ _._1(uow) }
+ queue.foreach{ _.on_commit(uow) }
uow.release
} else {
- queue.foreach{ _._1(null) }
+ queue.foreach{ _.on_commit(null) }
on_complete
}
}
def rollback = {
- queue.foreach{ case (x, y) =>
- if( y != null ) {
- y()
- }
- }
+ queue.foreach{ _.on_rollback() }
}
}