You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/07 21:43:08 UTC

svn commit: r1032385 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Author: chirino
Date: Sun Nov  7 20:43:08 2010
New Revision: 1032385

URL: http://svn.apache.org/viewvc?rev=1032385&view=rev
Log:
Use more consistent receipt handling. Make sure we switch to the connection's dispatch context before using the connection (new asserts were showing this was not always the case.)

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1032385&r1=1032384&r2=1032385&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Sun Nov  7 20:43:08 2010
@@ -667,7 +667,7 @@ class StompProtocolHandler extends Proto
 
       if( receipt!=null ) {
         delivery.ack = { storeTx =>
-          dispatchQueue {
+          dispatchQueue <<| ^{
             connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
           }
         }
@@ -692,8 +692,6 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_subscribe(headers:HeaderMap):Unit = {
-    val receipt = get(headers, RECEIPT_REQUESTED)
-
     val dest = get(headers, DESTINATION) match {
       case Some(dest)=> dest
       case None=>
@@ -780,9 +778,7 @@ class StompProtocolHandler extends Proto
 
       // consumer is bind bound as a topic
       host.router.bind(destination, consumer, ^{
-        receipt.foreach{ receipt =>
-          connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-        }
+        send_receipt(headers)
       })
       consumer.release
 
@@ -793,9 +789,7 @@ class StompProtocolHandler extends Proto
         x match {
           case Some(queue:Queue) =>
             queue.bind(consumer::Nil)
-            receipt.foreach{ receipt =>
-              connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-            }
+            send_receipt(headers)
             consumer.release
           case None => throw new RuntimeException("case not yet implemented.")
         }
@@ -805,7 +799,6 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
 
-    val receipt = get(headers, RECEIPT_REQUESTED)
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
 
     val id = get(headers, ID).getOrElse {
@@ -832,9 +825,7 @@ class StompProtocolHandler extends Proto
         // consumer.close
         if( consumer.binding==null ) {
           host.router.unbind(consumer.destination, consumer)
-          receipt.foreach{ receipt =>
-            connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-          }
+          send_receipt(headers)
         } else {
           host.router.get_queue(consumer.binding) { queue=>
             queue.foreach( _.unbind(consumer::Nil) )
@@ -842,14 +833,10 @@ class StompProtocolHandler extends Proto
 
           if( persistent && consumer.binding!=null ) {
             host.router.destroy_queue(consumer.binding){sucess=>
-              receipt.foreach{ receipt =>
-                connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-              }
+              send_receipt(headers)
             }
           } else {
-            receipt.foreach{ receipt =>
-              connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-            }
+            send_receipt(headers)
           }
 
         }
@@ -898,10 +885,7 @@ class StompProtocolHandler extends Proto
               get_or_create_tx_queue(txid){ _.add(frame, (uow)=>{ handler.perform_ack(messageId, uow)} ) }
           }
 
-          get(headers, RECEIPT_REQUESTED).foreach { receipt =>
-            connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
-          }
-
+          send_receipt(headers)
         }
 
 
@@ -959,7 +943,9 @@ class StompProtocolHandler extends Proto
   def send_receipt(headers:HeaderMap) = {
     get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
-        connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+        dispatchQueue <<| ^{
+          connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+        }
       case None=>
     }
   }