You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:09:18 UTC
svn commit: r961137 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
activemq-stomp/src/test/scala/org/apache/activemq/apollo...
Author: chirino
Date: Wed Jul 7 04:09:18 2010
New Revision: 961137
URL: http://svn.apache.org/viewvc?rev=961137&view=rev
Log:
stomp protocol handler now supports multiple consumer and producers on 1 connnection and client acks
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:09:18 2010
@@ -107,7 +107,7 @@ class Queue(val host: VirtualHost, val d
/**
* Tunning options.
*/
- var tune_max_size = 1024 * 1024 * 4
+ var tune_max_size = 1024 * 256
var tune_subscription_prefetch = 1024*32
var tune_max_outbound_size = 1024 * 1204 * 5
var tune_swap_delay = 100L
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 04:09:18 2010
@@ -29,7 +29,8 @@ import org.apache.activemq.apollo.broker
object StompFrameConstants {
type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
- var NO_DATA = new Buffer(0);
+ val NO_DATA = new Buffer(0);
+
}
import StompFrameConstants._
@@ -38,7 +39,7 @@ import BufferConversions._
case class StompFrameMessage(frame:StompFrame) extends Message {
- def protocol = "stomp"
+ def protocol = PROTOCOL
/**
* the globally unique id of the message
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:09:18 2010
@@ -52,9 +52,11 @@ object StompConstants {
}
class StompProtocol extends Protocol {
+ import StompConstants._
+
val wff = new StompWireFormatFactory
- def name = new AsciiBuffer("stomp")
+ def name = PROTOCOL
def createWireFormat = wff.createWireFormat
@@ -82,7 +84,7 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
- class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
+ class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
dispatchQueue.retain
@@ -92,15 +94,18 @@ class StompProtocolHandler extends Proto
})
def matches(delivery:Delivery) = {
- // TODO add selector support here...
- delivery.message.isInstanceOf[StompFrameMessage]
+ if( delivery.message.protocol eq PROTOCOL ) {
+ true
+ } else {
+ false
+ }
}
def connect(p:DeliveryProducer) = new DeliverySession {
retain
def producer = p
- def consumer = SimpleConsumer.this
+ def consumer = StompConsumer.this
val session = session_manager.open(producer.dispatchQueue)
@@ -115,8 +120,17 @@ class StompProtocolHandler extends Proto
if( session.full ) {
false
} else {
- if( delivery.ack!=null ) {
- delivery.ack(null)
+ if( delivery.ack!=null) {
+ if( ackMode eq Headers.Subscribe.AckModeValues.AUTO ) {
+ delivery.ack(null)
+ } else {
+ // switch the the queue context.. this method is in the producer's context.
+ queue {
+ // we need to correlate acks from the client.. to invoke the
+ // delivery ack.
+ pendingAcks += ( delivery.message.id->delivery.ack )
+ }
+ }
}
val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
val rc = session.offer(frame)
@@ -135,12 +149,13 @@ class StompProtocolHandler extends Proto
var connection_sink:Sink[StompFrame] = null
var closed = false
- var consumer:SimpleConsumer = null
+ var consumers = Map[AsciiBuffer, StompConsumer]()
- var producerRoute:DeliveryProducerRoute=null
+ var producerRoutes = Map[Destination, DeliveryProducerRoute]()
var host:VirtualHost = null
private def queue = connection.dispatchQueue
+ var pendingAcks = HashMap[AsciiBuffer, (StoreBatch)=>Unit]()
override def onTransportConnected() = {
session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
@@ -158,14 +173,15 @@ class StompProtocolHandler extends Proto
override def onTransportDisconnected() = {
if( !closed ) {
closed=true;
- if( producerRoute!=null ) {
- host.router.disconnect(producerRoute)
- producerRoute=null
+ producerRoutes.foreach{
+ case(_,route)=> host.router.disconnect(route)
}
- if( consumer!=null ) {
+ producerRoutes = Map()
+ consumers.foreach {
+ case (_,consumer)=>
host.router.unbind(consumer.destination, consumer::Nil)
- consumer=null
}
+ consumers = Map()
trace("stomp protocol resources released")
}
}
@@ -177,7 +193,7 @@ class StompProtocolHandler extends Proto
case StompFrame(Commands.SEND, headers, content, _) =>
on_stomp_send(command.asInstanceOf[StompFrame])
case StompFrame(Commands.ACK, headers, content, _) =>
- // TODO:
+ on_stomp_ack(headers)
case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
info("got command: %s", command)
on_stomp_subscribe(headers)
@@ -221,36 +237,34 @@ class StompProtocolHandler extends Proto
get(frame.headers, Headers.Send.DESTINATION) match {
case Some(dest)=>
val destiantion:Destination = dest
- // create the producer route...
- if( producerRoute==null || producerRoute.destination!=destiantion ) {
-
- // clean up the previous producer..
- if( producerRoute!=null ) {
- host.router.disconnect(producerRoute)
- producerRoute=null
- }
-
- val producer = new DeliveryProducer() {
- override def dispatchQueue = queue
- }
- // don't process frames until we are connected..
- connection.transport.suspendRead
- host.router.connect(destiantion, producer) {
- (route) =>
- if( !connection.stopped ) {
- connection.transport.resumeRead
- producerRoute = route
- producerRoute.refiller = ^{
+ producerRoutes.get(destiantion) match {
+ case None=>
+ // create the producer route...
+
+ val producer = new DeliveryProducer() {
+ override def dispatchQueue = queue
+ }
+
+ // don't process frames until producer is connected...
+ connection.transport.suspendRead
+ host.router.connect(destiantion, producer) { route =>
+ if( !connection.stopped ) {
connection.transport.resumeRead
+ route.refiller = ^{
+ connection.transport.resumeRead
+ }
+ producerRoutes += destiantion->route
+ send_via_route(route, frame)
}
- send_via_route(producerRoute, frame)
- }
- }
- } else {
- // we can re-use the existing producer route
- send_via_route(producerRoute, frame)
+ }
+
+ case Some(route)=>
+ // we can re-use the existing producer route
+ send_via_route(route, frame)
+
}
+
case None=>
die("destination not set.")
}
@@ -268,14 +282,15 @@ class StompProtocolHandler extends Proto
if( !route.targets.isEmpty ) {
// We may need to add some headers..
- var message = if( frame.header(Stomp.Headers.Message.MESSAGE_ID)==null ) {
- var updated_headers:HeaderMap=Nil;
- updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
- StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
- } else {
- StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
+ var message = get( frame.headers, Stomp.Headers.Message.MESSAGE_ID) match {
+ case None=>
+ var updated_headers:HeaderMap=Nil;
+ updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
+ StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
+ case Some(id)=>
+ StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
}
-
+
val delivery = new Delivery
delivery.message = message
delivery.size = message.frame.size
@@ -308,14 +323,37 @@ class StompProtocolHandler extends Proto
get(headers, Headers.Subscribe.DESTINATION) match {
case Some(dest)=>
val destiantion:Destination = dest
- if( consumer !=null ) {
- die("Only one subscription supported.")
- } else {
- info("subscribing to: %s", destiantion)
- consumer = new SimpleConsumer(destiantion);
- host.router.bind(destiantion, consumer :: Nil)
- consumer.release
+ var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match {
+ case None => dest
+ case Some(x)=> x
+ }
+
+ val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match {
+ case None=> Headers.Subscribe.AckModeValues.AUTO
+ case Some(x)=> x match {
+ case Headers.Subscribe.AckModeValues.AUTO=> Headers.Subscribe.AckModeValues.AUTO
+ case Headers.Subscribe.AckModeValues.CLIENT=> Headers.Subscribe.AckModeValues.CLIENT
+ case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
+ }
+ }
+
+ val selector = get(headers, Headers.Subscribe.SELECTOR) match {
+ case None=> null
+ case Some(x)=> x
+ }
+
+
+ consumers.get(id) match {
+ case None=>
+ info("subscribing to: %s", destiantion)
+ val consumer = new StompConsumer(destiantion, ack, selector);
+ host.router.bind(destiantion, consumer :: Nil)
+ consumer.release
+ consumers += (id -> consumer)
+
+ case Some(_)=>
+ die("A subscription with identified with '"+id+"' allready exists")
}
case None=>
die("destination not set.")
@@ -323,11 +361,25 @@ class StompProtocolHandler extends Proto
}
+ def on_stomp_ack(headers:HeaderMap) = {
+ get(headers, Headers.Ack.MESSAGE_ID) match {
+ case Some(messageId)=>
+ pendingAcks.remove(messageId) match {
+ case Some(ack) =>
+ ack(null)
+ case None =>
+ die("The specified message id is not waiting for a client ack: "+messageId)
+ }
+ case None=> die("message id header not set")
+ }
+ }
+
+
private def die(msg:String) = {
if( !connection.stopped ) {
info("Shutting connection down due to: "+msg)
connection.transport.suspendRead
- connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+ connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)))
^ {
connection.stop()
} >>: queue
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:09:18 2010
@@ -102,6 +102,16 @@ class StompWireFormat extends WireFormat
frame.action.writeTo(os)
os.write(NEWLINE)
+ // Write any updated headers first...
+ if( !frame.updated_headers.isEmpty ) {
+ for( (key, value) <- frame.updated_headers ) {
+ key.writeTo(os)
+ os.write(SEPERATOR)
+ value.writeTo(os)
+ os.write(NEWLINE)
+ }
+ }
+
// we can optimize a little if the headers and content are in the same buffer..
if( !frame.headers.isEmpty && !frame.content.isEmpty &&
( frame.headers.head._1.data eq frame.content.data ) ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961137&r1=961136&r2=961137&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 04:09:18 2010
@@ -19,11 +19,14 @@ package org.apache.activemq.apollo.stomp
import _root_.java.io._
import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import _root_.org.fusesource.hawtbuf.AsciiBuffer
+import _root_.org.fusesource.hawtbuf.{ByteArrayOutputStream => BAOS}
+
import java.net.{ProtocolException, InetSocketAddress, URI, Socket}
import java.lang.String._
import java.util.concurrent.TimeUnit._
import collection.mutable.Map
+import org.apache.activemq.apollo.stomp.Stomp
/**
*
@@ -46,9 +49,10 @@ object StompLoadClient {
var messageSize = 1024;
var useContentLength=true
var persistent = false;
- var syncProducer = false;
+ var syncSend = false;
+ var ack = "client";
- var destinationType = "topic";
+ var destinationType = "queue";
var destinationCount = 1;
val producerCounter = new AtomicLong();
@@ -128,18 +132,24 @@ object StompLoadClient {
"StompLoadClient Properties\n"+
"--------------------------------------\n"+
"uri = "+uri+"\n"+
- "producers = "+producers+"\n"+
- "consumers = "+consumers+"\n"+
"destinationType = "+destinationType+"\n"+
"destinationCount = "+destinationCount+"\n" +
+ "sampleInterval = "+sampleInterval+"\n"
+ "\n"+
+ "--- Producer Properties ---\n"+
+ "producers = "+producers+"\n"+
"messageSize = "+messageSize+"\n"+
"persistent = "+persistent+"\n"+
- "syncProducer = "+syncProducer+"\n"+
+ "syncSend = "+syncSend+"\n"+
+ "useContentLength = "+useContentLength+"\n"+
"producerSleep = "+producerSleep+"\n"+
+ "\n"+
+ "--- Consumer Properties ---\n"+
+ "consumers = "+consumers+"\n"+
"consumerSleep = "+consumerSleep+"\n"+
- "bufferSize = "+bufferSize+"\n"+
- "useContentLength = "+useContentLength+"\n"+
- "sampleInterval = "+sampleInterval+"\n"
+ "ack = "+ack+"\n"+
+ ""
+
}
def printRate(name: String, counter: AtomicLong, nanos: Long) = {
@@ -231,7 +241,7 @@ object StompLoadClient {
}
def receive():String = {
- val buffer = new ByteArrayOutputStream(500)
+ val buffer = new ByteArrayOutputStream(messageSize+200)
var c = in.read;
while( c >= 0 ) {
if( c==0 ) {
@@ -243,6 +253,19 @@ object StompLoadClient {
throw new EOFException()
}
+ def receiveAscii():AsciiBuffer = {
+ val buffer = new BAOS(messageSize+200)
+ var c = in.read;
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return buffer.toBuffer.ascii
+ }
+ buffer.write(c);
+ c = in.read()
+ }
+ throw new EOFException()
+ }
+
def receive(expect:String):String = {
val rc = receive()
if( !rc.trimFront.startsWith(expect) ) {
@@ -259,7 +282,7 @@ object StompLoadClient {
val content = ("SEND\n" +
"destination:"+destination(id)+"\n"+
{ if(persistent) "persistent:true\n" else "" } +
- { if(syncProducer) "receipt:xxx\n" else "" } +
+ { if(syncSend) "receipt:xxx\n" else "" } +
{ if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
"\n"+message(name)).getBytes("UTF-8")
@@ -270,7 +293,7 @@ object StompLoadClient {
var i =0;
while (!done.get) {
client.send(content)
- if( syncProducer ) {
+ if( syncSend ) {
// waits for the reply..
client.flush
client.skip
@@ -312,17 +335,37 @@ object StompLoadClient {
val headers = Map[AsciiBuffer, AsciiBuffer]();
client.send("""
SUBSCRIBE
+ack:"""+ack+"""
destination:"""+destination(id)+"""
""")
client.flush
+ receiveLoop
+ }
+ }
+ }
- while (!done.get) {
- client.skip
- consumerCounter.incrementAndGet();
- Thread.sleep(consumerSleep);
- }
+ def receiveLoop() = {
+ val clientAck = ack == "client"
+ while (!done.get) {
+ if( clientAck ) {
+ val msg = client.receiveAscii()
+ val start = msg.indexOf(Stomp.Headers.Message.MESSAGE_ID)
+ assert( start >= 0 )
+ val end = msg.indexOf("\n", start)
+ val msgId = msg.slice(start+Stomp.Headers.Message.MESSAGE_ID.length+1, end).ascii
+ client.send("""
+ACK
+message-id:"""+msgId+"""
+
+""")
+ client.flush
+
+ } else {
+ client.skip
}
+ consumerCounter.incrementAndGet();
+ Thread.sleep(consumerSleep);
}
}
}