You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:09:18 UTC

svn commit: r961137 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-stomp/src/test/scala/org/apache/activemq/apollo...

Author: chirino
Date: Wed Jul  7 04:09:18 2010
New Revision: 961137

URL: http://svn.apache.org/viewvc?rev=961137&view=rev
Log:
stomp protocol handler now supports multiple consumer and producers on 1 connnection and client acks

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:09:18 2010
@@ -107,7 +107,7 @@ class Queue(val host: VirtualHost, val d
   /**
    * Tunning options.
    */
-  var tune_max_size = 1024 * 1024 * 4
+  var tune_max_size = 1024 * 256
   var tune_subscription_prefetch = 1024*32
   var tune_max_outbound_size = 1024 * 1204 * 5
   var tune_swap_delay = 100L

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 04:09:18 2010
@@ -29,7 +29,8 @@ import org.apache.activemq.apollo.broker
 object StompFrameConstants {
   type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
   type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
-  var NO_DATA = new Buffer(0);
+  val NO_DATA = new Buffer(0);
+
 }
 
 import StompFrameConstants._
@@ -38,7 +39,7 @@ import BufferConversions._
 
 case class StompFrameMessage(frame:StompFrame) extends Message {
   
-  def protocol = "stomp"
+  def protocol = PROTOCOL
 
   /**
    * the globally unique id of the message

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:09:18 2010
@@ -52,9 +52,11 @@ object StompConstants {
 }
 
 class StompProtocol extends Protocol {
+  import StompConstants._
+  
   val wff = new StompWireFormatFactory
 
-  def name = new AsciiBuffer("stomp")
+  def name = PROTOCOL
 
   def createWireFormat = wff.createWireFormat
 
@@ -82,7 +84,7 @@ class StompProtocolHandler extends Proto
   
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
+  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
     dispatchQueue.retain
@@ -92,15 +94,18 @@ class StompProtocolHandler extends Proto
     })
 
     def matches(delivery:Delivery) = {
-      // TODO add selector support here...
-      delivery.message.isInstanceOf[StompFrameMessage]
+      if( delivery.message.protocol eq PROTOCOL ) {
+        true
+      } else {
+        false
+      }
     }
 
     def connect(p:DeliveryProducer) = new DeliverySession {
       retain
 
       def producer = p
-      def consumer = SimpleConsumer.this
+      def consumer = StompConsumer.this
 
       val session = session_manager.open(producer.dispatchQueue)
 
@@ -115,8 +120,17 @@ class StompProtocolHandler extends Proto
         if( session.full ) {
           false
         } else {
-          if( delivery.ack!=null ) {
-            delivery.ack(null)
+          if( delivery.ack!=null) {
+            if( ackMode eq Headers.Subscribe.AckModeValues.AUTO ) {
+              delivery.ack(null)
+            } else {
+              // switch the the queue context.. this method is in the producer's context.
+              queue {
+                // we need to correlate acks from the client.. to invoke the
+                // delivery ack.
+                pendingAcks += ( delivery.message.id->delivery.ack )
+              }
+            }
           }
           val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
           val rc = session.offer(frame)
@@ -135,12 +149,13 @@ class StompProtocolHandler extends Proto
   var connection_sink:Sink[StompFrame] = null
 
   var closed = false
-  var consumer:SimpleConsumer = null
+  var consumers = Map[AsciiBuffer, StompConsumer]()
 
-  var producerRoute:DeliveryProducerRoute=null
+  var producerRoutes = Map[Destination, DeliveryProducerRoute]()
   var host:VirtualHost = null
 
   private def queue = connection.dispatchQueue
+  var pendingAcks = HashMap[AsciiBuffer, (StoreBatch)=>Unit]()
 
   override def onTransportConnected() = {
     session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
@@ -158,14 +173,15 @@ class StompProtocolHandler extends Proto
   override def onTransportDisconnected() = {
     if( !closed ) {
       closed=true;
-      if( producerRoute!=null ) {
-        host.router.disconnect(producerRoute)
-        producerRoute=null
+      producerRoutes.foreach{
+        case(_,route)=> host.router.disconnect(route)
       }
-      if( consumer!=null ) {
+      producerRoutes = Map()
+      consumers.foreach {
+        case (_,consumer)=>
         host.router.unbind(consumer.destination, consumer::Nil)
-        consumer=null
       }
+      consumers = Map()
       trace("stomp protocol resources released")
     }
   }
@@ -177,7 +193,7 @@ class StompProtocolHandler extends Proto
         case StompFrame(Commands.SEND, headers, content, _) =>
           on_stomp_send(command.asInstanceOf[StompFrame])
         case StompFrame(Commands.ACK, headers, content, _) =>
-          // TODO:
+          on_stomp_ack(headers)
         case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
           info("got command: %s", command)
           on_stomp_subscribe(headers)
@@ -221,36 +237,34 @@ class StompProtocolHandler extends Proto
     get(frame.headers, Headers.Send.DESTINATION) match {
       case Some(dest)=>
         val destiantion:Destination = dest
-        // create the producer route...
-        if( producerRoute==null || producerRoute.destination!=destiantion ) {
-
-          // clean up the previous producer..
-          if( producerRoute!=null ) {
-            host.router.disconnect(producerRoute)
-            producerRoute=null
-          }
-
-          val producer = new DeliveryProducer() {
-            override def dispatchQueue = queue
-          }
 
-          // don't process frames until we are connected..
-          connection.transport.suspendRead
-          host.router.connect(destiantion, producer) {
-            (route) =>
-              if( !connection.stopped ) {
-                connection.transport.resumeRead
-                producerRoute = route
-                producerRoute.refiller = ^{
+        producerRoutes.get(destiantion) match {
+          case None=>
+            // create the producer route...
+
+            val producer = new DeliveryProducer() {
+              override def dispatchQueue = queue
+            }
+
+            // don't process frames until producer is connected...
+            connection.transport.suspendRead
+            host.router.connect(destiantion, producer) { route =>
+                if( !connection.stopped ) {
                   connection.transport.resumeRead
+                  route.refiller = ^{
+                    connection.transport.resumeRead
+                  }
+                  producerRoutes += destiantion->route
+                  send_via_route(route, frame)
                 }
-                send_via_route(producerRoute, frame)
-              }
-          }
-        } else {
-          // we can re-use the existing producer route
-          send_via_route(producerRoute, frame)
+            }
+
+          case Some(route)=>
+            // we can re-use the existing producer route
+            send_via_route(route, frame)
+
         }
+
       case None=>
         die("destination not set.")
     }
@@ -268,14 +282,15 @@ class StompProtocolHandler extends Proto
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
-      var message = if( frame.header(Stomp.Headers.Message.MESSAGE_ID)==null ) {
-        var updated_headers:HeaderMap=Nil;
-        updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
-        StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
-      } else {
-        StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
+      var message = get( frame.headers, Stomp.Headers.Message.MESSAGE_ID) match {
+        case None=>
+          var updated_headers:HeaderMap=Nil;
+          updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
+          StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
+        case Some(id)=>
+          StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
       }
-      
+
       val delivery = new Delivery
       delivery.message = message
       delivery.size = message.frame.size
@@ -308,14 +323,37 @@ class StompProtocolHandler extends Proto
     get(headers, Headers.Subscribe.DESTINATION) match {
       case Some(dest)=>
         val destiantion:Destination = dest
-        if( consumer !=null ) {
-          die("Only one subscription supported.")
 
-        } else {
-          info("subscribing to: %s", destiantion)
-          consumer = new SimpleConsumer(destiantion);
-          host.router.bind(destiantion, consumer :: Nil)
-          consumer.release
+        var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match {
+          case None => dest
+          case Some(x)=> x
+        }
+
+        val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match {
+          case None=> Headers.Subscribe.AckModeValues.AUTO
+          case Some(x)=> x match {
+            case Headers.Subscribe.AckModeValues.AUTO=> Headers.Subscribe.AckModeValues.AUTO
+            case Headers.Subscribe.AckModeValues.CLIENT=> Headers.Subscribe.AckModeValues.CLIENT
+            case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
+          }
+        }
+
+        val selector = get(headers, Headers.Subscribe.SELECTOR) match {
+          case None=> null
+          case Some(x)=> x
+        }
+
+
+        consumers.get(id) match {
+          case None=>
+            info("subscribing to: %s", destiantion)
+            val consumer = new StompConsumer(destiantion, ack, selector);
+            host.router.bind(destiantion, consumer :: Nil)
+            consumer.release
+            consumers += (id -> consumer)
+
+          case Some(_)=>
+            die("A subscription with identified with '"+id+"' allready exists")
         }
       case None=>
         die("destination not set.")
@@ -323,11 +361,25 @@ class StompProtocolHandler extends Proto
 
   }
 
+  def on_stomp_ack(headers:HeaderMap) = {
+    get(headers, Headers.Ack.MESSAGE_ID) match {
+      case Some(messageId)=>
+        pendingAcks.remove(messageId) match {
+          case Some(ack) =>
+            ack(null)
+          case None =>
+            die("The specified message id is not waiting for a client ack: "+messageId)
+        }
+      case None=> die("message id header not set")
+    }
+  }
+
+
   private def die(msg:String) = {
     if( !connection.stopped ) {
       info("Shutting connection down due to: "+msg)
       connection.transport.suspendRead
-      connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+      connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)))
       ^ {
         connection.stop()
       } >>: queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 04:09:18 2010
@@ -102,6 +102,16 @@ class StompWireFormat extends WireFormat
     frame.action.writeTo(os)
     os.write(NEWLINE)
 
+    // Write any updated headers first...
+    if( !frame.updated_headers.isEmpty ) {
+      for( (key, value) <- frame.updated_headers ) {
+        key.writeTo(os)
+        os.write(SEPERATOR)
+        value.writeTo(os)
+        os.write(NEWLINE)
+      }
+    }
+
     // we can optimize a little if the headers and content are in the same buffer..
     if( !frame.headers.isEmpty && !frame.content.isEmpty &&
             ( frame.headers.head._1.data eq frame.content.data ) ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 04:09:18 2010
@@ -19,11 +19,14 @@ package org.apache.activemq.apollo.stomp
 import _root_.java.io._
 import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import _root_.org.fusesource.hawtbuf.AsciiBuffer
+import _root_.org.fusesource.hawtbuf.{ByteArrayOutputStream => BAOS}
+
 import java.net.{ProtocolException, InetSocketAddress, URI, Socket}
 
 import java.lang.String._
 import java.util.concurrent.TimeUnit._
 import collection.mutable.Map
+import org.apache.activemq.apollo.stomp.Stomp
 
 /**
  *
@@ -46,9 +49,10 @@ object StompLoadClient {
   var messageSize = 1024;
   var useContentLength=true
   var persistent = false;
-  var syncProducer = false;
+  var syncSend = false;
+  var ack = "client";
 
-  var destinationType = "topic";
+  var destinationType = "queue";
   var destinationCount = 1;
 
   val producerCounter = new AtomicLong();
@@ -128,18 +132,24 @@ object StompLoadClient {
     "StompLoadClient Properties\n"+
     "--------------------------------------\n"+
     "uri              = "+uri+"\n"+
-    "producers        = "+producers+"\n"+
-    "consumers        = "+consumers+"\n"+
     "destinationType  = "+destinationType+"\n"+
     "destinationCount = "+destinationCount+"\n" +
+    "sampleInterval   = "+sampleInterval+"\n"
+    "\n"+
+    "--- Producer Properties ---\n"+
+    "producers        = "+producers+"\n"+
     "messageSize      = "+messageSize+"\n"+
     "persistent       = "+persistent+"\n"+
-    "syncProducer     = "+syncProducer+"\n"+
+    "syncSend         = "+syncSend+"\n"+
+    "useContentLength = "+useContentLength+"\n"+
     "producerSleep    = "+producerSleep+"\n"+
+    "\n"+
+    "--- Consumer Properties ---\n"+
+    "consumers        = "+consumers+"\n"+
     "consumerSleep    = "+consumerSleep+"\n"+
-    "bufferSize       = "+bufferSize+"\n"+
-    "useContentLength = "+useContentLength+"\n"+
-    "sampleInterval   = "+sampleInterval+"\n"
+    "ack              = "+ack+"\n"+
+    ""
+
   }
 
   def printRate(name: String, counter: AtomicLong, nanos: Long) = {
@@ -231,7 +241,7 @@ object StompLoadClient {
     }
 
     def receive():String = {
-      val buffer = new ByteArrayOutputStream(500)
+      val buffer = new ByteArrayOutputStream(messageSize+200)
       var c = in.read;
       while( c >= 0 ) {
         if( c==0 ) {
@@ -243,6 +253,19 @@ object StompLoadClient {
       throw new EOFException()
     }
 
+    def receiveAscii():AsciiBuffer = {
+      val buffer = new BAOS(messageSize+200)
+      var c = in.read;
+      while( c >= 0 ) {
+        if( c==0 ) {
+          return buffer.toBuffer.ascii
+        }
+        buffer.write(c);
+        c = in.read()
+      }
+      throw new EOFException()
+    }
+
     def receive(expect:String):String = {
       val rc = receive()
       if( !rc.trimFront.startsWith(expect) ) {
@@ -259,7 +282,7 @@ object StompLoadClient {
     val content = ("SEND\n" +
               "destination:"+destination(id)+"\n"+
                { if(persistent) "persistent:true\n" else "" } +
-               { if(syncProducer) "receipt:xxx\n" else "" } +
+               { if(syncSend) "receipt:xxx\n" else "" } +
                { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
               "\n"+message(name)).getBytes("UTF-8")
 
@@ -270,7 +293,7 @@ object StompLoadClient {
           var i =0;
           while (!done.get) {
             client.send(content)
-            if( syncProducer ) {
+            if( syncSend ) {
               // waits for the reply..
               client.flush
               client.skip
@@ -312,17 +335,37 @@ object StompLoadClient {
           val headers = Map[AsciiBuffer, AsciiBuffer]();
           client.send("""
 SUBSCRIBE
+ack:"""+ack+"""
 destination:"""+destination(id)+"""
 
 """)
           client.flush
+          receiveLoop
+        }
+      }
+    }
 
-          while (!done.get) {
-            client.skip
-            consumerCounter.incrementAndGet();
-            Thread.sleep(consumerSleep);
-          }
+    def receiveLoop() = {
+      val clientAck = ack == "client"
+      while (!done.get) {
+        if( clientAck ) {
+          val msg = client.receiveAscii()
+          val start = msg.indexOf(Stomp.Headers.Message.MESSAGE_ID)
+          assert( start >= 0 )
+          val end = msg.indexOf("\n", start)
+          val msgId = msg.slice(start+Stomp.Headers.Message.MESSAGE_ID.length+1, end).ascii
+          client.send("""
+ACK
+message-id:"""+msgId+"""
+
+""")
+          client.flush
+
+        } else {
+          client.skip
         }
+        consumerCounter.incrementAndGet();
+        Thread.sleep(consumerSleep);
       }
     }
   }