You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/27 23:47:18 UTC
svn commit: r1001918 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Author: chirino
Date: Mon Sep 27 21:47:18 2010
New Revision: 1001918
URL: http://svn.apache.org/viewvc?rev=1001918&view=rev
Log:
Adding some initial support for transactions.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1001918&r1=1001917&r2=1001918&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Mon Sep 27 21:47:18 2010
@@ -413,11 +413,15 @@ case class DeliveryProducerRoute(val rou
targets.foreach { target=>
- // only delivery to matching consumers
+ // only deliver to matching consumers
if( target.consumer.matches(delivery) ) {
if( storeOnMatch ) {
- delivery.uow = router.host.store.createStoreUOW
+ if( delivery.uow==null ) {
+ delivery.uow = router.host.store.createStoreUOW
+ } else {
+ delivery.uow.retain
+ }
delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
storeOnMatch = false
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1001918&r1=1001917&r2=1001918&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Sep 27 21:47:18 2010
@@ -255,10 +255,18 @@ class StompProtocolHandler extends Proto
override def onTransportCommand(command:Any) = {
try {
command match {
- case StompFrame(Commands.SEND, headers, content, _) =>
+ case StompFrame(Commands.SEND, _, _, _) =>
on_stomp_send(command.asInstanceOf[StompFrame])
case StompFrame(Commands.ACK, headers, content, _) =>
- on_stomp_ack(headers)
+ on_stomp_ack(command.asInstanceOf[StompFrame])
+
+ case StompFrame(Commands.BEGIN, headers, content, _) =>
+ on_stomp_begin(headers)
+ case StompFrame(Commands.COMMIT, headers, content, _) =>
+ on_stomp_commit(headers)
+ case StompFrame(Commands.ABORT, headers, content, _) =>
+ on_stomp_abort(headers)
+
case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
info("got command: %s", command)
on_stomp_subscribe(headers)
@@ -299,46 +307,61 @@ class StompProtocolHandler extends Proto
}
def on_stomp_send(frame:StompFrame) = {
+
get(frame.headers, Headers.Send.DESTINATION) match {
+ case None=>
+ frame.release
+ die("destination not set.")
+
case Some(dest)=>
- val destiantion:Destination = dest
- producerRoutes.get(destiantion) match {
+ get(frame.headers, Headers.TRANSACTION) match {
case None=>
- // create the producer route...
+ perform_send(frame)
+ case Some(txid)=>
+ get_or_create_tx_queue(txid){ txqueue=>
+ txqueue.add(frame)
+ }
+ }
- val producer = new DeliveryProducer() {
+ }
+ }
- override def connection = Some( StompProtocolHandler.this.connection )
+ def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
- override def dispatchQueue = queue
- }
+ val destiantion: Destination = get(frame.headers, Headers.Send.DESTINATION).get
+ producerRoutes.get(destiantion) match {
+ case None =>
+ // create the producer route...
- // don't process frames until producer is connected...
- connection.transport.suspendRead
- host.router.connect(destiantion, producer) { route =>
- if( !connection.stopped ) {
- connection.transport.resumeRead
- route.refiller = ^{
- connection.transport.resumeRead
- }
- producerRoutes += destiantion->route
- send_via_route(route, frame)
- }
- }
+ val producer = new DeliveryProducer() {
+ override def connection = Some(StompProtocolHandler.this.connection)
- case Some(route)=>
- // we can re-use the existing producer route
- send_via_route(route, frame)
+ override def dispatchQueue = queue
+ }
+ // don't process frames until producer is connected...
+ connection.transport.suspendRead
+ host.router.connect(destiantion, producer) {
+ route =>
+ if (!connection.stopped) {
+ connection.transport.resumeRead
+ route.refiller = ^ {
+ connection.transport.resumeRead
+ }
+ producerRoutes += destiantion -> route
+ send_via_route(route, frame, uow)
+ }
}
- case None=>
- frame.release
- die("destination not set.")
+ case Some(route) =>
+ // we can re-use the existing producer route
+ send_via_route(route, frame, uow)
+
}
}
+
var message_id_counter = 0;
def next_message_id = {
message_id_counter += 1
@@ -346,9 +369,9 @@ class StompProtocolHandler extends Proto
new AsciiBuffer("msg:"+message_id_counter);
}
- def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+ def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
var storeBatch:StoreUOW=null
- // User might be asking for ack that we have prcoessed the message..
+ // User might be asking for ack that we have processed the message..
val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
if( !route.targets.isEmpty ) {
@@ -366,6 +389,7 @@ class StompProtocolHandler extends Proto
val delivery = new Delivery
delivery.message = message
delivery.size = message.frame.size
+ delivery.uow = uow
if( receipt!=null ) {
delivery.ack = { storeTx =>
@@ -490,12 +514,22 @@ class StompProtocolHandler extends Proto
}
- def on_stomp_ack(headers:HeaderMap) = {
+ def on_stomp_ack(frame:StompFrame) = {
+ val headers = frame.headers
get(headers, Headers.Ack.MESSAGE_ID) match {
case Some(messageId)=>
- pendingAcks.remove(messageId) match {
+ pendingAcks.get(messageId) match {
case Some(ack) =>
- ack(null)
+ get(headers, Headers.TRANSACTION) match {
+ case None=>
+ perform_ack(frame)
+ case Some(txid)=>
+ get_or_create_tx_queue(txid){ txqueue=>
+ txqueue.add(frame)
+ }
+ }
+
+
case None =>
// This can easily happen if the consumer is doing client acks on something like
// a non-durable topic.
@@ -505,6 +539,13 @@ class StompProtocolHandler extends Proto
}
}
+ def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
+ val msgid = get(frame.headers, Headers.Ack.MESSAGE_ID).get
+ pendingAcks.remove(msgid) match {
+ case Some(ack) => ack(uow)
+ case None => die("message allready acked: %s".format(msgid))
+ }
+ }
private def die(msg:String) = {
if( !connection.stopped ) {
@@ -524,5 +565,102 @@ class StompProtocolHandler extends Proto
super.onTransportFailure(error);
}
}
+
+
+ def require_transaction_header[T](headers:HeaderMap)(proc:(AsciiBuffer)=>T):Option[T] = {
+ get(headers, Headers.TRANSACTION) match {
+ case None=> die("transaction header not set")
+ None
+ case Some(txid)=> Some(proc(txid))
+ }
+ }
+
+ def on_stomp_begin(headers:HeaderMap) = {
+ require_transaction_header(headers){ txid=>create_tx_queue(txid){ _ => send_receipt(headers) } }
+ }
+
+ def on_stomp_commit(headers:HeaderMap) = {
+ require_transaction_header(headers){ txid=>remove_tx_queue(txid){ _.commit { send_receipt(headers) } } }
+ }
+
+ def on_stomp_abort(headers:HeaderMap) = {
+ require_transaction_header(headers){ txid=>remove_tx_queue(txid){ _.rollback { send_receipt(headers) } } }
+ }
+
+
+ def send_receipt(headers:HeaderMap) = {
+ get(headers, Stomp.Headers.RECEIPT_REQUESTED) match {
+ case Some(receipt)=>
+ connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ case None=>
+ }
+ }
+
+ class TransactionQueue {
+ // TODO: eventually we want to back this /w a broker Queue which
+ // can provides persistence and memory swapping.
+
+ val queue = ListBuffer[StompFrame]()
+
+ def add(frame:StompFrame) = {
+ queue += frame
+ }
+
+ def commit(onComplete: => Unit) = {
+
+ val uow = if( host.store!=null ) {
+ host.store.createStoreUOW
+ } else {
+ null
+ }
+
+ queue.foreach { frame=>
+ frame.action match {
+ case Commands.SEND =>
+ perform_send(frame, uow)
+ case Commands.ACK =>
+ perform_ack(frame, uow)
+ case _ => throw new java.lang.AssertionError("assertion failed: only send or ack frames are transactional")
+ }
+ }
+ if( uow!=null ) {
+ uow.onComplete(^{
+ onComplete
+ })
+ uow.release
+ } else {
+ onComplete
+ }
+
+ }
+
+ def rollback(onComplete: => Unit) = {
+ queue.clear
+ onComplete
+ }
+
+ }
+
+ val transactions = HashMap[AsciiBuffer, TransactionQueue]()
+
+ def create_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+ if ( transactions.contains(txid) ) {
+ die("transaction allready started")
+ } else {
+ proc( transactions.put(txid, new TransactionQueue).get )
+ }
+ }
+
+ def get_or_create_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+ proc(transactions.getOrElseUpdate(txid, new TransactionQueue))
+ }
+
+ def remove_tx_queue(txid:AsciiBuffer)(proc:(TransactionQueue)=>Unit) = {
+ transactions.remove(txid) match {
+ case None=> die("transaction not active: %d".format(txid))
+ case Some(txqueue)=> proc(txqueue)
+ }
+ }
+
}