You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/31 17:30:59 UTC

svn commit: r1508925 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Author: chirino
Date: Wed Jul 31 15:30:59 2013
New Revision: 1508925

URL: http://svn.apache.org/r1508925
Log:
Have the stomp protocol keep better track of in progress commits to aid in debugging.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1508925&r1=1508924&r2=1508925&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jul 31 15:30:59 2013
@@ -1212,9 +1212,14 @@ class StompProtocolHandler extends Proto
           case None=>
             perform_send(frame)
           case Some(txid)=>
-            get_or_create_tx_queue(txid).add { uow=>
-              perform_send(frame, uow)
-            }
+            get_or_create_tx_queue(txid).add (new TransactionAction(){
+              override def on_commit(uow: StoreUOW) {
+                perform_send(frame, uow)
+              }
+              override def toString: String = {
+                "send to: "+dest
+              }
+            })
         }
 
     }
@@ -1715,11 +1720,17 @@ class StompProtocolHandler extends Proto
             handler.perform_ack(consumed, messageId, null)
           case Some(txid)=>
             handler.consumer.retain()
-            get_or_create_tx_queue(txid).add({ uow=>
-              handler.perform_ack(consumed, messageId, uow)
-              handler.consumer.release()
-            }, ()=>{
-              handler.consumer.release()
+            get_or_create_tx_queue(txid).add (new TransactionAction(){
+              override def on_commit(uow: StoreUOW) {
+                handler.perform_ack(consumed, messageId, uow)
+                handler.consumer.release()
+              }
+              override def on_rollback() {
+                handler.consumer.release()
+              }
+              override def toString: String = {
+                "ack: "+messageId
+              }
             })
         }
       }
@@ -1737,7 +1748,10 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_commit(headers:HeaderMap) = {
-    remove_tx_queue(require_transaction_header(headers)).commit {
+    val txid = require_transaction_header(headers)
+    val tx = transactions.get(txid).getOrElse(die("transaction not active: %d".format(txid)))
+    tx.commit {
+      remove_tx_queue(txid)
       send_receipt(headers)
     }
   }
@@ -1767,25 +1781,29 @@ class StompProtocolHandler extends Proto
     frame
   }
 
+  class TransactionAction {
+    def on_commit(uow:StoreUOW):Unit = {}
+    def on_rollback() = {}
+  }
 
   class TransactionQueue {
     // TODO: eventually we want to back this /w a broker Queue which
     // can provides persistence and memory swapping.
 
-    val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
-
+    val queue = ListBuffer[TransactionAction]()
+    var uow:StoreUOW = _
 
     override def toString: String = {
-      "{ actions: "+queue.size+" }"
+      "{ uow: "+uow+", actions: "+queue+" }"
     }
 
-    def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
-      queue += ((on_commit, on_rollback))
+    def add(action:TransactionAction):Unit = {
+      queue += action
     }
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow
+        uow = host.store.create_uow
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
@@ -1793,20 +1811,16 @@ class StompProtocolHandler extends Proto
             on_complete
           }
         }
-        queue.foreach{ _._1(uow) }
+        queue.foreach{ _.on_commit(uow) }
         uow.release
       } else {
-        queue.foreach{ _._1(null) }
+        queue.foreach{ _.on_commit(null) }
         on_complete
       }
     }
 
     def rollback = {
-      queue.foreach{ case (x, y) =>
-        if( y != null ) {
-          y()
-        }
-      }
+      queue.foreach{ _.on_rollback() }
     }
 
   }