You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/27 23:47:18 UTC

svn commit: r1001918 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Author: chirino
Date: Mon Sep 27 21:47:18 2010
New Revision: 1001918

URL: http://svn.apache.org/viewvc?rev=1001918&view=rev
Log:
Adding some initial support for transactions.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1001918&r1=1001917&r2=1001918&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Mon Sep 27 21:47:18 2010
@@ -413,11 +413,15 @@ case class DeliveryProducerRoute(val rou
 
       targets.foreach { target=>
 
-        // only delivery to matching consumers
+        // only deliver to matching consumers
         if( target.consumer.matches(delivery) ) {
 
           if( storeOnMatch ) {
-            delivery.uow = router.host.store.createStoreUOW
+            if( delivery.uow==null ) {
+              delivery.uow = router.host.store.createStoreUOW
+            } else {
+              delivery.uow.retain
+            }
             delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
             storeOnMatch = false
           }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1001918&r1=1001917&r2=1001918&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Sep 27 21:47:18 2010
@@ -255,10 +255,18 @@ class StompProtocolHandler extends Proto
   override def onTransportCommand(command:Any) = {
     try {
       command match {
-        case StompFrame(Commands.SEND, headers, content, _) =>
+        case StompFrame(Commands.SEND, _, _, _) =>
           on_stomp_send(command.asInstanceOf[StompFrame])
         case StompFrame(Commands.ACK, headers, content, _) =>
-          on_stomp_ack(headers)
+          on_stomp_ack(command.asInstanceOf[StompFrame])
+
+        case StompFrame(Commands.BEGIN, headers, content, _) =>
+          on_stomp_begin(headers)
+        case StompFrame(Commands.COMMIT, headers, content, _) =>
+          on_stomp_commit(headers)
+        case StompFrame(Commands.ABORT, headers, content, _) =>
+          on_stomp_abort(headers)
+
         case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
           info("got command: %s", command)
           on_stomp_subscribe(headers)
@@ -299,46 +307,61 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_send(frame:StompFrame) = {
+
     get(frame.headers, Headers.Send.DESTINATION) match {
+      case None=>
+        frame.release
+        die("destination not set.")
+
       case Some(dest)=>
-        val destiantion:Destination = dest
 
-        producerRoutes.get(destiantion) match {
+        get(frame.headers, Headers.TRANSACTION) match {
           case None=>
-            // create the producer route...
+            perform_send(frame)
+          case Some(txid)=>
+            get_or_create_tx_queue(txid){ txqueue=>
+              txqueue.add(frame)
+            }
+        }
 
-            val producer = new DeliveryProducer() {
+    }
+  }
 
-              override def connection = Some( StompProtocolHandler.this.connection )
+  def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
 
-              override def dispatchQueue = queue
-            }
+    val destiantion: Destination = get(frame.headers, Headers.Send.DESTINATION).get
+    producerRoutes.get(destiantion) match {
+      case None =>
+        // create the producer route...
 
-            // don't process frames until producer is connected...
-            connection.transport.suspendRead
-            host.router.connect(destiantion, producer) { route =>
-                if( !connection.stopped ) {
-                  connection.transport.resumeRead
-                  route.refiller = ^{
-                    connection.transport.resumeRead
-                  }
-                  producerRoutes += destiantion->route
-                  send_via_route(route, frame)
-                }
-            }
+        val producer = new DeliveryProducer() {
+          override def connection = Some(StompProtocolHandler.this.connection)
 
-          case Some(route)=>
-            // we can re-use the existing producer route
-            send_via_route(route, frame)
+          override def dispatchQueue = queue
+        }
 
+        // don't process frames until producer is connected...
+        connection.transport.suspendRead
+        host.router.connect(destiantion, producer) {
+          route =>
+            if (!connection.stopped) {
+              connection.transport.resumeRead
+              route.refiller = ^ {
+                connection.transport.resumeRead
+              }
+              producerRoutes += destiantion -> route
+              send_via_route(route, frame, uow)
+            }
         }
 
-      case None=>
-        frame.release
-        die("destination not set.")
+      case Some(route) =>
+        // we can re-use the existing producer route
+        send_via_route(route, frame, uow)
+
     }
   }
 
+
   var message_id_counter = 0;
   def next_message_id = {
     message_id_counter += 1
@@ -346,9 +369,9 @@ class StompProtocolHandler extends Proto
     new AsciiBuffer("msg:"+message_id_counter);
   }
 
-  def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+  def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
-    // User might be asking for ack that we have prcoessed the message..
+    // User might be asking for ack that we have processed the message..
     val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
 
     if( !route.targets.isEmpty ) {
@@ -366,6 +389,7 @@ class StompProtocolHandler extends Proto
       val delivery = new Delivery
       delivery.message = message
       delivery.size = message.frame.size
+      delivery.uow = uow
 
       if( receipt!=null ) {
         delivery.ack = { storeTx =>
@@ -490,12 +514,22 @@ class StompProtocolHandler extends Proto
 
   }
 
-  def on_stomp_ack(headers:HeaderMap) = {
+  def on_stomp_ack(frame:StompFrame) = {
+    val headers = frame.headers
     get(headers, Headers.Ack.MESSAGE_ID) match {
       case Some(messageId)=>
-        pendingAcks.remove(messageId) match {
+        pendingAcks.get(messageId) match {
           case Some(ack) =>
-            ack(null)
+            get(headers, Headers.TRANSACTION) match {
+              case None=>
+                perform_ack(frame)
+              case Some(txid)=>
+                get_or_create_tx_queue(txid){ txqueue=>
+                  txqueue.add(frame)
+                }
+            }
+
+
           case None =>
             // This can easily happen if the consumer is doing client acks on something like
             // a non-durable topic.
@@ -505,6 +539,13 @@ class StompProtocolHandler extends Proto
     }
   }
 
+  def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
+    val msgid = get(frame.headers, Headers.Ack.MESSAGE_ID).get
+    pendingAcks.remove(msgid) match {
+      case Some(ack) => ack(uow)
+      case None => die("message allready acked: %s".format(msgid))
+    }
+  }
 
   private def die(msg:String) = {
     if( !connection.stopped ) {
@@ -524,5 +565,102 @@ class StompProtocolHandler extends Proto
       super.onTransportFailure(error);
     }
   }
+
+
+  def require_transaction_header[T](headers:HeaderMap)(proc:(AsciiBuffer)=>T):Option[T] = {
+    get(headers, Headers.TRANSACTION) match {
+      case None=> die("transaction header not set")
+      None
+      case Some(txid)=> Some(proc(txid))
+    }
+  }
+
+  def on_stomp_begin(headers:HeaderMap) = {
+    require_transaction_header(headers){ txid=>create_tx_queue(txid){ _ => send_receipt(headers) }  }
+  }
+
+  def on_stomp_commit(headers:HeaderMap) = {
+    require_transaction_header(headers){ txid=>remove_tx_queue(txid){ _.commit { send_receipt(headers) } } }
+  }
+
+  def on_stomp_abort(headers:HeaderMap) = {
+    require_transaction_header(headers){ txid=>remove_tx_queue(txid){ _.rollback { send_receipt(headers) } } }
+  }
+
+
+  def send_receipt(headers:HeaderMap) = {
+    get(headers, Stomp.Headers.RECEIPT_REQUESTED) match {
+      case Some(receipt)=>
+        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+      case None=>
+    }
+  }
+
+  class TransactionQueue {
+    // TODO: eventually we want to back this /w a broker Queue which
+    // can provides persistence and memory swapping.
+
+    val queue = ListBuffer[StompFrame]()
+
+    def add(frame:StompFrame) = {
+      queue += frame
+    }
+
+    def commit(onComplete: => Unit) = {
+
+      val uow = if( host.store!=null ) {
+        host.store.createStoreUOW
+      } else {
+        null
+      }
+
+      queue.foreach { frame=>
+        frame.action match {
+          case Commands.SEND =>
+            perform_send(frame, uow)
+          case Commands.ACK =>
+            perform_ack(frame, uow)
+          case _ => throw new java.lang.AssertionError("assertion failed: only send or ack frames are transactional")
+        }
+      }
+      if( uow!=null ) {
+        uow.onComplete(^{
+          onComplete
+        })
+        uow.release
+      } else {
+        onComplete
+      }
+
+    }
+
+    def rollback(onComplete: => Unit) = {
+      queue.clear
+      onComplete
+    }
+
+  }
+
+  val transactions = HashMap[AsciiBuffer, TransactionQueue]()
+
+  def create_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+    if ( transactions.contains(txid) ) {
+      die("transaction allready started")
+    } else {
+      proc( transactions.put(txid, new TransactionQueue).get )
+    }
+  }
+
+  def get_or_create_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+    proc(transactions.getOrElseUpdate(txid, new TransactionQueue))
+  }
+
+  def remove_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+    transactions.remove(txid) match {
+      case None=> die("transaction not active: %d".format(txid))
+      case Some(txqueue)=> proc(txqueue)
+    }
+  }
+
 }