You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/10 14:33:31 UTC
svn commit: r1044356 - in
/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp:
StompProtocol.scala StompProtocolHandler.scala
Author: chirino
Date: Fri Dec 10 13:33:31 2010
New Revision: 1044356
URL: http://svn.apache.org/viewvc?rev=1044356&view=rev
Log:
Moving the StompProtocolHandler to it's own source file.
Added:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
- copied, changed from r1044209, activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1044356&r1=1044355&r2=1044356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Dec 10 13:33:31 2010
@@ -16,29 +16,13 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
-import org.fusesource.hawtdispatch._
-
-import AsciiBuffer._
import org.apache.activemq.apollo.broker._
import java.lang.String
-import protocol.{HeartBeatMonitor, ProtocolFactory, Protocol, ProtocolHandler}
-import security.SecurityContext
+import protocol.{ProtocolFactory, Protocol}
import Stomp._
-import BufferConversions._
-import java.io.IOException
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
import org.apache.activemq.apollo.transport._
import org.apache.activemq.apollo.store._
-import org.apache.activemq.apollo.util._
-import java.util.concurrent.TimeUnit
-import java.util.Map.Entry
-import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, SubscriptionBindingDTO, QueueBindingDTO}
-import scala.util.continuations._
-
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -97,868 +81,3 @@ object StompProtocol extends StompProtoc
}
-object StompProtocolHandler extends Log {
-
- // How long we hold a failed connection open so that the remote end
- // can get the resulting error message.
- val DEFAULT_DIE_DELAY = 5*1000L
- var die_delay = DEFAULT_DIE_DELAY
-
- // How often we can send heartbeats of the connection is idle.
- val DEFAULT_OUTBOUND_HEARTBEAT = 100L
- var outbound_heartbeat = DEFAULT_OUTBOUND_HEARTBEAT
-
- // How often we want to get heartbeats from the peer if the connection is idle.
- val DEFAULT_INBOUND_HEARTBEAT = 10*1000L
- var inbound_heartbeat = DEFAULT_INBOUND_HEARTBEAT
-
-}
-
-import StompProtocolHandler._
-
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
-
- def protocol = "stomp"
-
- override protected def log = StompProtocolHandler
-
- protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
-
- trait AckHandler {
- def track(delivery:Delivery):Unit
- def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
- }
-
- class AutoAckHandler extends AckHandler {
- def track(delivery:Delivery) = {
- if( delivery.ack!=null ) {
- delivery.ack(null)
- }
- }
-
- def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
- async_die("The subscription ack mode does not expect ACK frames")
- }
- }
-
- class SessionAckHandler extends AckHandler{
- var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
-
- def track(delivery:Delivery) = {
- queue.apply {
- if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
- }
- consumer_acks += (( delivery.message.id, delivery.ack ))
- }
-
- }
-
-
- def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
-
- // session acks ack all previously recieved messages..
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( found ) {
- false
- } else {
- if( id == msgid ) {
- found = true
- }
- true
- }
- }
-
- if( acked.isEmpty ) {
- async_die("ACK failed, invalid message id: %s".format(msgid))
- } else {
- consumer_acks = not_acked
- acked.foreach{case (id, ack)=>
- if( ack!=null ) {
- ack(uow)
- }
- }
- }
-
- if( protocol_version eq V1_0 ) {
- connection_ack_handlers.remove(msgid)
- }
- }
-
-
-
- }
- class MessageAckHandler extends AckHandler {
- var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
-
- def track(delivery:Delivery) = {
- queue.apply {
- if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
- }
- consumer_acks += ( delivery.message.id -> delivery.ack )
- }
- }
-
- def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
- consumer_acks.remove(msgid) match {
- case Some(ack) =>
- if( ack!=null ) {
- ack(uow)
- }
- case None => async_die("ACK failed, invalid message id: %s".format(msgid))
- }
-
- if( protocol_version eq V1_0 ) {
- connection_ack_handlers.remove(msgid)
- }
- }
- }
-
- class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination, val ack_handler:AckHandler, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO) extends BaseRetained with DeliveryConsumer {
- val dispatchQueue = StompProtocolHandler.this.dispatchQueue
-
-
- dispatchQueue.retain
- setDisposer(^{
- session_manager.release
- dispatchQueue.release
- })
-
- override def connection = Some(StompProtocolHandler.this.connection)
-
- def is_persistent = false
-
- def matches(delivery:Delivery) = {
- if( delivery.message.protocol eq StompProtocol ) {
- if( selector!=null ) {
- selector._2.matches(delivery.message)
- } else {
- true
- }
- } else {
- false
- }
- }
-
- def connect(p:DeliveryProducer) = new DeliverySession {
- retain
-
- def producer = p
- def consumer = StompConsumer.this
-
- val session = session_manager.open(producer.dispatchQueue)
-
- def close = {
- session_manager.close(session)
- release
- }
-
- // Delegate all the flow control stuff to the session
- def full = session.full
- def offer(delivery:Delivery) = {
- if( session.full ) {
- false
- } else {
- ack_handler.track(delivery)
- var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
- if( subscription_id != None ) {
- frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
- }
- frame.retain
- val rc = session.offer(frame)
- assert(rc, "offer should be accepted since it was not full")
- true
- }
- }
-
- def refiller = session.refiller
- def refiller_=(value:Runnable) = { session.refiller=value }
-
- }
- }
-
- var session_manager:SinkMux[StompFrame] = null
- var connection_sink:Sink[StompFrame] = null
-
- var closed = false
- var consumers = Map[AsciiBuffer, StompConsumer]()
-
- var producerRoutes = new LRUCache[Destination, DeliveryProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[Destination, DeliveryProducerRoute]) = {
- host.router.disconnect(eldest.getValue)
- }
- }
-
- var host:VirtualHost = null
-
- private def queue = connection.dispatchQueue
-
- // uses by STOMP 1.0 clients
- var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
-
- var session_id:AsciiBuffer = _
- var protocol_version:AsciiBuffer = _
-
- var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
- val security_context = new SecurityContext
- var waiting_on:String = "client request"
-
-
- override def create_connection_status = {
- var rc = new StompConnectionStatusDTO
- rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
- rc.user = security_context.user
- rc.subscription_count = consumers.size
- rc.waiting_on = waiting_on
- rc
- }
-
- class ProtocolException(msg:String) extends RuntimeException(msg)
- class Break extends RuntimeException
-
- private def async_die(msg:String, e:Throwable=null) = try {
- die(msg)
- } catch {
- case x:Break=>
- }
-
- private def die[T](msg:String, e:Throwable=null):T = {
- if( e!=null) {
- debug(e, "Shutting connection down due to: "+msg)
- } else {
- debug("Shutting connection down due to: "+msg)
- }
- die((MESSAGE_HEADER, ascii(msg))::Nil, "")
- }
-
- private def die[T](headers:HeaderMap, body:String):T = {
- if( !connection.stopped ) {
- suspendRead("shutdown")
- connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
- // TODO: if there are too many open connections we should just close the connection
- // without waiting for the error to get sent to the client.
- queue.after(die_delay, TimeUnit.MILLISECONDS) {
- connection.stop()
- }
- }
- throw new Break()
- }
-
- override def onTransportConnected() = {
-
- session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>
- trace("sending frame: %s", x)
- x
- }, dispatchQueue, StompFrame)
- connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
- connection_sink.refiller = NOOP
- resumeRead
- }
-
- override def onTransportDisconnected() = {
- if( !closed ) {
- heart_beat_monitor.stop
- closed=true;
-
- import collection.JavaConversions._
- producerRoutes.foreach{
- case(_,route)=> host.router.disconnect(route)
- }
- producerRoutes.clear
- consumers.foreach {
- case (_,consumer)=>
- if( consumer.binding==null ) {
- host.router.unbind(consumer.destination, consumer)
- } else {
- reset {
- val queue = host.router.get_queue(consumer.binding)
- queue.foreach( _.unbind(consumer::Nil) )
- }
- }
- }
- consumers = Map()
- trace("stomp protocol resources released")
- }
- }
-
-
- override def onTransportCommand(command:Any) = {
- try {
- command match {
- case s:StompCodec =>
- // this is passed on to us by the protocol discriminator
- // so we know which wire format is being used.
- case frame:StompFrame=>
-
- trace("received frame: %s", frame)
-
- if( protocol_version == null ) {
-
- frame.action match {
- case STOMP =>
- on_stomp_connect(frame.headers)
- case CONNECT =>
- on_stomp_connect(frame.headers)
- case DISCONNECT =>
- connection.stop
- case _ =>
- die("Client must first send a connect frame");
- }
-
- } else {
- frame.action match {
- case SEND =>
- on_stomp_send(frame)
- case ACK =>
- on_stomp_ack(frame)
-
- case BEGIN =>
- on_stomp_begin(frame.headers)
- case COMMIT =>
- on_stomp_commit(frame.headers)
- case ABORT =>
- on_stomp_abort(frame.headers)
- case SUBSCRIBE =>
- on_stomp_subscribe(frame.headers)
- case UNSUBSCRIBE =>
- on_stomp_unsubscribe(frame.headers)
-
- case DISCONNECT =>
- connection.stop
-
- case _ =>
- die("Invalid frame: "+frame.action);
- }
- }
-
- case _=>
- warn("Internal Server Error: unexpected command type")
- die("Internal Server Error");
- }
- } catch {
- case e: Break =>
- case e:Exception =>
- async_die("Internal Server Error", e);
- }
- }
-
-
- def suspendRead(reason:String) = {
- waiting_on = reason
- connection.transport.suspendRead
- }
- def resumeRead() = {
- waiting_on = "client request"
- connection.transport.resumeRead
- }
-
- def weird(headers:HeaderMap) = {
- println("weird: "+headers)
- }
-
- def on_stomp_connect(headers:HeaderMap):Unit = {
-
- security_context.user = get(headers, LOGIN).toString
- security_context.password = get(headers, PASSCODE).toString
-
- val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
- protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
- case Some(x) => x
- case None=>
- val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
- die((MESSAGE_HEADER, ascii("version not supported"))::
- (VERSION, ascii(supported_versions))::Nil,
- "Supported protocol versions are %s".format(supported_versions))
- }
-
- val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEAT_BEAT)
- heart_beat.split(COMMA).map(_.ascii) match {
- case Array(cx,cy) =>
- try {
- val can_send = cx.toString.toLong
- val please_send = cy.toString.toLong
-
- if( inbound_heartbeat>=0 && can_send > 0 ) {
- heart_beat_monitor.read_interval = inbound_heartbeat.max(can_send)
-
- // lets be a little forgiving to account to packet transmission latency.
- heart_beat_monitor.read_interval += heart_beat_monitor.read_interval.min(5000)
-
- heart_beat_monitor.on_dead = () => {
- async_die("Stale connection. Missed heartbeat.")
- }
- }
- if( outbound_heartbeat>=0 && please_send > 0 ) {
- heart_beat_monitor.write_interval = outbound_heartbeat.max(please_send)
- heart_beat_monitor.on_keep_alive = () => {
- connection.transport.offer(NEWLINE_BUFFER)
- }
- }
-
- heart_beat_monitor.transport = connection.transport
- heart_beat_monitor.start
-
- } catch {
- case x:NumberFormatException=>
- die("Invalid heart-beat header: "+heart_beat)
- }
- case _ =>
- die("Invalid heart-beat header: "+heart_beat)
- }
-
- def noop = shift { k: (Unit=>Unit) => k() }
- reset {
- suspendRead("virtual host lookup")
- val host_header = get(headers, HOST)
- val host = host_header match {
- case None=>
- connection.connector.broker.getDefaultVirtualHost
- case Some(host)=>
- connection.connector.broker.getVirtualHost(host)
- }
- resumeRead
- if(host==null) {
- async_die("Invalid virtual host: "+host_header.get)
- noop // to make the cps compiler plugin happy.
- } else {
- this.host=host
-
- var authenticated = true;
-
- if( host.authenticator!=null ) {
- suspendRead("authenticating")
- authenticated = host.authenticator.authenticate(security_context)
- resumeRead
- } else {
- noop // to make the cps compiler plugin happy.
- }
-
- if( !authenticated ) {
- async_die("Authentication failed.")
- } else {
- val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
- session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
- if( connection_sink==null ) {
- weird(headers)
- }
- connection_sink.offer(
- StompFrame(CONNECTED, List(
- (VERSION, protocol_version),
- (SESSION, session_id),
- (HEART_BEAT, outbound_heart_beat_header)
- )))
-
- if( this.host.direct_buffer_pool!=null ) {
- val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
- }
- }
-
- }
- }
-
- }
-
- def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
- names.map(x=>get(headers, x))
- }
-
- def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
- val i = headers.iterator
- while( i.hasNext ) {
- val entry = i.next
- if( entry._1 == name ) {
- return Some(entry._2)
- }
- }
- None
- }
-
- def on_stomp_send(frame:StompFrame) = {
-
- get(frame.headers, DESTINATION) match {
- case None=>
- frame.release
- die("destination not set.")
-
- case Some(dest)=>
-
- get(frame.headers, TRANSACTION) match {
- case None=>
- perform_send(frame)
- case Some(txid)=>
- get_or_create_tx_queue(txid).add { uow=>
- perform_send(frame, uow)
- }
- }
-
- }
- }
-
- def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
-
- val destiantion: Destination = get(frame.headers, DESTINATION).get
- producerRoutes.get(destiantion) match {
- case null =>
- // create the producer route...
-
- val producer = new DeliveryProducer() {
- override def connection = Some(StompProtocolHandler.this.connection)
-
- override def dispatchQueue = queue
- }
-
- // don't process frames until producer is connected...
- connection.transport.suspendRead
- host.router.connect(destiantion, producer) {
- route =>
- if (!connection.stopped) {
- resumeRead
- route.refiller = ^ {
- resumeRead
- }
- producerRoutes.put(destiantion, route)
- send_via_route(route, frame, uow)
- }
- }
-
- case route =>
- // we can re-use the existing producer route
- send_via_route(route, frame, uow)
-
- }
- }
-
-
- var message_id_counter = 0;
- def next_message_id = {
- message_id_counter += 1
- // TODO: properly generate mesage ids
- new AsciiBuffer("msg:"+message_id_counter);
- }
-
- def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
- var storeBatch:StoreUOW=null
- // User might be asking for ack that we have processed the message..
- val receipt = frame.header(RECEIPT_REQUESTED)
-
- if( !route.targets.isEmpty ) {
-
- // We may need to add some headers..
- var message = get( frame.headers, MESSAGE_ID) match {
- case None=>
- var updated_headers:HeaderMap=Nil;
- updated_headers ::= (MESSAGE_ID, next_message_id)
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, updated_headers))
- case Some(id)=>
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
- }
-
- val delivery = new Delivery
- delivery.message = message
- delivery.size = message.frame.size
- delivery.uow = uow
-
- if( receipt!=null ) {
- delivery.ack = { storeTx =>
- dispatchQueue <<| ^{
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
- }
- }
-
- // routes can always accept at least 1 delivery...
- assert( !route.full )
- route.offer(delivery)
- if( route.full ) {
- // but once it gets full.. suspend, so that we get more stomp messages
- // until it's not full anymore.
- suspendRead("blocked destination: "+route.destination)
- }
-
- } else {
- // info("Dropping message. No consumers interested in message.")
- if( receipt!=null ) {
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
- }
- frame.release
- }
-
- def on_stomp_subscribe(headers:HeaderMap):Unit = {
- val dest = get(headers, DESTINATION).getOrElse(die("destination not set."))
- val destination:Destination = dest
-
- val subscription_id = get(headers, ID)
- var id:AsciiBuffer = subscription_id.getOrElse {
- if( protocol_version eq V1_0 ) {
- // in 1.0 it's ok if the client does not send us the
- // the id header
- dest
- } else {
- die("The id header is missing from the SUBSCRIBE frame");
- }
-
- }
-
- val topic = destination.domain == Router.TOPIC_DOMAIN
- var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
-
- val ack = get(headers, ACK_MODE) match {
- case None=> new AutoAckHandler
- case Some(x)=> x match {
- case ACK_MODE_AUTO=>new AutoAckHandler
- case ACK_MODE_NONE=>new AutoAckHandler
- case ACK_MODE_CLIENT=> new SessionAckHandler
- case ACK_MODE_SESSION=> new SessionAckHandler
- case ACK_MODE_MESSAGE=> new MessageAckHandler
- case ack:AsciiBuffer =>
- die("Unsuported ack mode: "+ack);
- }
- }
-
- val selector = get(headers, SELECTOR) match {
- case None=> null
- case Some(x)=> x
- try {
- (x, SelectorParser.parse(x.utf8.toString))
- } catch {
- case e:FilterException =>
- die("Invalid selector expression: "+e.getMessage)
- }
- }
-
- if ( consumers.contains(id) ) {
- die("A subscription with identified with '"+id+"' allready exists")
- }
-
- val binding: BindingDTO = if( topic && !persistent ) {
- null
- } else {
- // Controls how the created queue gets bound
- // to the destination name space (this is used to
- // recover the queue on restart and rebind it the
- // way again)
- if (topic) {
- val rc = new SubscriptionBindingDTO
- rc.destination = DestinationParser.encode_path(destination.name)
- // TODO:
- // rc.client_id =
- rc.subscription_id = if( persistent ) id else null
- rc.filter = if (selector == null) null else selector._1
- rc
- } else {
- val rc = new QueueBindingDTO
- rc.destination = DestinationParser.encode_path(destination.name)
- rc
- }
- }
-
- val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
- consumers += (id -> consumer)
-
- if( binding==null ) {
-
- // consumer is bind bound as a topic
- reset {
- host.router.bind(destination, consumer)
- send_receipt(headers)
- consumer.release
- }
-
- } else {
- reset {
- // create a queue and bind the consumer to it.
- val x= host.router.create_queue(binding)
- x match {
- case Some(queue:Queue) =>
- queue.bind(consumer::Nil)
- send_receipt(headers)
- consumer.release
- case None => async_die("case not yet implemented.")
- }
- }
- }
- }
-
- def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
-
- var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
-
- val id = get(headers, ID).getOrElse {
- if( protocol_version eq V1_0 ) {
- // in 1.0 it's ok if the client does not send us the
- // the id header, the destination header must be set
- get(headers, DESTINATION) match {
- case Some(dest)=> dest
- case None=>
- die("destination not set.")
- }
- } else {
- die("The id header is missing from the UNSUBSCRIBE frame");
- }
- }
-
- consumers.get(id) match {
- case None=>
- die("The subscription '%s' not found.".format(id))
- case Some(consumer)=>
- // consumer.close
- if( consumer.binding==null ) {
- host.router.unbind(consumer.destination, consumer)
- send_receipt(headers)
- } else {
-
- reset {
- val queue = host.router.get_queue(consumer.binding)
- queue.foreach( _.unbind(consumer::Nil) )
- }
-
- if( persistent && consumer.binding!=null ) {
- reset {
- val sucess = host.router.destroy_queue(consumer.binding)
- send_receipt(headers)
- }
- } else {
- send_receipt(headers)
- }
-
- }
-
- }
- }
-
- def on_stomp_ack(frame:StompFrame):Unit = {
- val headers = frame.headers
- val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
-
- val subscription_id = get(headers, SUBSCRIPTION);
- val handler = subscription_id match {
- case None=>
- if( !(protocol_version eq V1_0) ) {
- die("The subscription header is required")
- }
- connection_ack_handlers.get(messageId).orElse(die("Not expecting ack for message id '%s'".format(messageId)))
- case Some(id) =>
- consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not exist".format(id)))
- }
-
- handler.foreach{ handler=>
- get(headers, TRANSACTION) match {
- case None=>
- handler.perform_ack(messageId, null)
- case Some(txid)=>
- get_or_create_tx_queue(txid).add{ uow=>
- handler.perform_ack(messageId, uow)
- }
- }
- send_receipt(headers)
- }
- }
-
- override def onTransportFailure(error: IOException) = {
- if( !connection.stopped ) {
- suspendRead("shutdown")
- debug(error, "Shutting connection down due to: %s", error)
- super.onTransportFailure(error);
- }
- }
-
-
- def require_transaction_header[T](headers:HeaderMap):AsciiBuffer = {
- get(headers, TRANSACTION).getOrElse(die("transaction header not set"))
- }
-
- def on_stomp_begin(headers:HeaderMap) = {
- create_tx_queue(require_transaction_header(headers))
- send_receipt(headers)
- }
-
- def on_stomp_commit(headers:HeaderMap) = {
- remove_tx_queue(require_transaction_header(headers)).commit {
- send_receipt(headers)
- }
- }
-
- def on_stomp_abort(headers:HeaderMap) = {
- remove_tx_queue(require_transaction_header(headers)).rollback
- send_receipt(headers)
- }
-
-
- def send_receipt(headers:HeaderMap):Unit = {
- get(headers, RECEIPT_REQUESTED) match {
- case Some(receipt)=>
- dispatchQueue <<| ^{
- connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
- }
- case None=>
- }
- }
-
- class TransactionQueue {
- // TODO: eventually we want to back this /w a broker Queue which
- // can provides persistence and memory swapping.
-
- val queue = ListBuffer[(StoreUOW)=>Unit]()
-
- def add(proc:(StoreUOW)=>Unit):Unit = {
- queue += proc
- }
-
- def commit(onComplete: => Unit) = {
-
- val uow = if( host.store!=null ) {
- host.store.createStoreUOW
- } else {
- null
- }
-
- queue.foreach{ _(uow) }
- if( uow!=null ) {
- uow.onComplete(^{
- onComplete
- })
- uow.release
- } else {
- onComplete
- }
-
- }
-
- def rollback = {
- queue.clear
- }
-
- }
-
- val transactions = HashMap[AsciiBuffer, TransactionQueue]()
-
- def create_tx_queue(txid:AsciiBuffer):TransactionQueue = {
- if ( transactions.contains(txid) ) {
- die("transaction allready started")
- } else {
- val queue = new TransactionQueue
- transactions.put(txid, queue)
- queue
- }
- }
-
- def get_or_create_tx_queue(txid:AsciiBuffer):TransactionQueue = {
- transactions.getOrElseUpdate(txid, new TransactionQueue)
- }
-
- def remove_tx_queue(txid:AsciiBuffer):TransactionQueue = {
- transactions.remove(txid).getOrElse(die("transaction not active: %d".format(txid)))
- }
-
-}
-
Copied: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (from r1044209, activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala&r1=1044209&r2=1044356&rev=1044356&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Dec 10 13:33:31 2010
@@ -1,5 +1,5 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.apollo.stomp
-import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import _root_.org.fusesource.hawtbuf._
import collection.mutable.{ListBuffer, HashMap}
import org.fusesource.hawtdispatch._
@@ -24,14 +23,12 @@ import org.fusesource.hawtdispatch._
import AsciiBuffer._
import org.apache.activemq.apollo.broker._
import java.lang.String
-import protocol.{HeartBeatMonitor, ProtocolFactory, Protocol, ProtocolHandler}
+import protocol.{HeartBeatMonitor, ProtocolHandler}
import security.SecurityContext
import Stomp._
-import BufferConversions._
import java.io.IOException
import org.apache.activemq.apollo.selector.SelectorParser
import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.transport._
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
@@ -39,64 +36,6 @@ import java.util.Map.Entry
import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, SubscriptionBindingDTO, QueueBindingDTO}
import scala.util.continuations._
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-/**
- * Creates StompCodec objects that encode/decode the
- * <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class StompProtocolCodecFactory extends ProtocolCodecFactory.Provider {
-
- def protocol = PROTOCOL
-
- def createProtocolCodec() = new StompCodec();
-
- def isIdentifiable() = true
-
- def maxIdentificaionLength() = CONNECT.length;
-
- def matchesIdentification(header: Buffer):Boolean = {
- if (header.length < CONNECT.length) {
- false
- } else {
- header.startsWith(CONNECT) || header.startsWith(STOMP)
- }
- }
-}
-
-class StompProtocolFactory extends ProtocolFactory.Provider {
-
- def create() = StompProtocol
-
- def create(config: String) = if(config == "stomp") {
- StompProtocol
- } else {
- null
- }
-
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object StompProtocol extends StompProtocolCodecFactory with Protocol {
-
- def createProtocolHandler = new StompProtocolHandler
-
- def encode(message: Message):MessageRecord = {
- StompCodec.encode(message.asInstanceOf[StompFrameMessage])
- }
-
- def decode(message: MessageRecord) = {
- StompCodec.decode(message)
- }
-
-}
-
-
object StompProtocolHandler extends Log {
// How long we hold a failed connection open so that the remote end
@@ -139,7 +78,7 @@ class StompProtocolHandler extends Proto
delivery.ack(null)
}
}
-
+
def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
async_die("The subscription ack mode does not expect ACK frames")
}
@@ -752,8 +691,8 @@ class StompProtocolHandler extends Proto
rc.destination = DestinationParser.encode_path(destination.name)
// TODO:
// rc.client_id =
- rc.subscription_id = if( persistent ) id else null
- rc.filter = if (selector == null) null else selector._1
+ rc.subscription_id = if( persistent ) id.toString else null
+ rc.filter = if (selector == null) null else selector._1.toString
rc
} else {
val rc = new QueueBindingDTO