You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/22 00:17:59 UTC
svn commit: r1364183 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Author: chirino
Date: Sat Jul 21 22:17:59 2012
New Revision: 1364183
URL: http://svn.apache.org/viewvc?rev=1364183&view=rev
Log:
Fix for APLO-218: Unexpected "Transport listener failure" when you send a TXed ack after unsubscribing.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1364183&r1=1364182&r2=1364183&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Jul 21 22:17:59 2012
@@ -209,6 +209,7 @@ class StompProtocolHandler extends Proto
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
def close:Unit
+ def consumer = StompConsumer.this
}
class AutoAckHandler extends AckHandler {
@@ -1402,10 +1403,10 @@ class StompProtocolHandler extends Proto
case Some(consumer)=>
// consumer gets disposed after all producer stop sending to it...
- consumer.setDisposer(^{ send_receipt(headers) })
consumers -= id
host.dispatch_queue {
host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+ send_receipt(headers)
}
}
}
@@ -1455,9 +1456,13 @@ class StompProtocolHandler extends Proto
case None=>
handler.perform_ack(consumed, messageId, null)
case Some(txid)=>
- get_or_create_tx_queue(txid).add{ uow=>
+ handler.consumer.retain()
+ get_or_create_tx_queue(txid).add({ uow=>
handler.perform_ack(consumed, messageId, uow)
- }
+ handler.consumer.release()
+ }, ()=>{
+ handler.consumer.release()
+ })
}
}
send_receipt(headers)
@@ -1512,10 +1517,10 @@ class StompProtocolHandler extends Proto
// TODO: eventually we want to back this /w a broker Queue which
// can provides persistence and memory swapping.
- val queue = ListBuffer[(StoreUOW)=>Unit]()
+ val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
- def add(proc:(StoreUOW)=>Unit):Unit = {
- queue += proc
+ def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
+ queue += ((on_commit, on_rollback))
}
def commit(on_complete: => Unit) = {
@@ -1526,16 +1531,16 @@ class StompProtocolHandler extends Proto
// println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
on_complete
}
- queue.foreach{ _(uow) }
+ queue.foreach{ _._1(uow) }
uow.release
} else {
- queue.foreach{ _(null) }
+ queue.foreach{ _._1(null) }
on_complete
}
}
def rollback = {
- queue.clear
+ queue.foreach{ _._2() }
}
}