You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/22 00:17:59 UTC

svn commit: r1364183 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Author: chirino
Date: Sat Jul 21 22:17:59 2012
New Revision: 1364183

URL: http://svn.apache.org/viewvc?rev=1364183&view=rev
Log:
Fix for APLO-218: Unexpected "Transport listener failure" when you send a TXed ack after unsubscribing.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1364183&r1=1364182&r2=1364183&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sat Jul 21 22:17:59 2012
@@ -209,6 +209,7 @@ class StompProtocolHandler extends Proto
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
       def close:Unit
+      def consumer = StompConsumer.this
     }
 
     class AutoAckHandler extends AckHandler {
@@ -1402,10 +1403,10 @@ class StompProtocolHandler extends Proto
         
       case Some(consumer)=>
         // consumer gets disposed after all producer stop sending to it...
-        consumer.setDisposer(^{ send_receipt(headers) })
         consumers -= id
         host.dispatch_queue {
           host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+          send_receipt(headers)
         }
     }
   }
@@ -1455,9 +1456,13 @@ class StompProtocolHandler extends Proto
           case None=>
             handler.perform_ack(consumed, messageId, null)
           case Some(txid)=>
-            get_or_create_tx_queue(txid).add{ uow=>
+            handler.consumer.retain()
+            get_or_create_tx_queue(txid).add({ uow=>
               handler.perform_ack(consumed, messageId, uow)
-            }
+              handler.consumer.release()
+            }, ()=>{
+              handler.consumer.release()
+            })
         }
       }
       send_receipt(headers)
@@ -1512,10 +1517,10 @@ class StompProtocolHandler extends Proto
     // TODO: eventually we want to back this /w a broker Queue which
     // can provides persistence and memory swapping.
 
-    val queue = ListBuffer[(StoreUOW)=>Unit]()
+    val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
 
-    def add(proc:(StoreUOW)=>Unit):Unit = {
-      queue += proc
+    def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
+      queue += ((on_commit, on_rollback))
     }
 
     def commit(on_complete: => Unit) = {
@@ -1526,16 +1531,16 @@ class StompProtocolHandler extends Proto
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
           on_complete
         }
-        queue.foreach{ _(uow) }
+        queue.foreach{ _._1(uow) }
         uow.release
       } else {
-        queue.foreach{ _(null) }
+        queue.foreach{ _._1(null) }
         on_complete
       }
     }
 
     def rollback = {
-      queue.clear
+      queue.foreach{ _._2() }
     }
 
   }