You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/07 21:43:08 UTC
svn commit: r1032385 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Author: chirino
Date: Sun Nov 7 20:43:08 2010
New Revision: 1032385
URL: http://svn.apache.org/viewvc?rev=1032385&view=rev
Log:
Use more consistent receipt handling. Make sure we switch to the connection's dispatch context before using the connection (new asserts were showing this was not always the case.)
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1032385&r1=1032384&r2=1032385&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Sun Nov 7 20:43:08 2010
@@ -667,7 +667,7 @@ class StompProtocolHandler extends Proto
if( receipt!=null ) {
delivery.ack = { storeTx =>
- dispatchQueue {
+ dispatchQueue <<| ^{
connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
}
}
@@ -692,8 +692,6 @@ class StompProtocolHandler extends Proto
}
def on_stomp_subscribe(headers:HeaderMap):Unit = {
- val receipt = get(headers, RECEIPT_REQUESTED)
-
val dest = get(headers, DESTINATION) match {
case Some(dest)=> dest
case None=>
@@ -780,9 +778,7 @@ class StompProtocolHandler extends Proto
// consumer is bind bound as a topic
host.router.bind(destination, consumer, ^{
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
+ send_receipt(headers)
})
consumer.release
@@ -793,9 +789,7 @@ class StompProtocolHandler extends Proto
x match {
case Some(queue:Queue) =>
queue.bind(consumer::Nil)
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
+ send_receipt(headers)
consumer.release
case None => throw new RuntimeException("case not yet implemented.")
}
@@ -805,7 +799,6 @@ class StompProtocolHandler extends Proto
def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
- val receipt = get(headers, RECEIPT_REQUESTED)
var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
val id = get(headers, ID).getOrElse {
@@ -832,9 +825,7 @@ class StompProtocolHandler extends Proto
// consumer.close
if( consumer.binding==null ) {
host.router.unbind(consumer.destination, consumer)
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
+ send_receipt(headers)
} else {
host.router.get_queue(consumer.binding) { queue=>
queue.foreach( _.unbind(consumer::Nil) )
@@ -842,14 +833,10 @@ class StompProtocolHandler extends Proto
if( persistent && consumer.binding!=null ) {
host.router.destroy_queue(consumer.binding){sucess=>
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
+ send_receipt(headers)
}
} else {
- receipt.foreach{ receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
+ send_receipt(headers)
}
}
@@ -898,10 +885,7 @@ class StompProtocolHandler extends Proto
get_or_create_tx_queue(txid){ _.add(frame, (uow)=>{ handler.perform_ack(messageId, uow)} ) }
}
- get(headers, RECEIPT_REQUESTED).foreach { receipt =>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
-
+ send_receipt(headers)
}
@@ -959,7 +943,9 @@ class StompProtocolHandler extends Proto
def send_receipt(headers:HeaderMap) = {
get(headers, RECEIPT_REQUESTED) match {
case Some(receipt)=>
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ dispatchQueue <<| ^{
+ connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+ }
case None=>
}
}