You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:01 UTC
svn commit: r961081 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-stomp/src/main/scala/org/apache/activemq...
Author: chirino
Date: Wed Jul 7 03:46:01 2010
New Revision: 961081
URL: http://svn.apache.org/viewvc?rev=961081&view=rev
Log:
you can now add header updates to a stomp frame
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:46:01 2010
@@ -390,7 +390,7 @@ trait DeliverySink {
def send(delivery:Delivery):Unit
}
-class TransportDeliverySink(val transport:Transport) extends DeliverySink {
+class TransportDeliverySink(var transport:Transport) extends DeliverySink {
def full:Boolean = transport.isFull
def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
}
@@ -472,7 +472,7 @@ class DeliveryOverflowBuffer(val deliver
}
-class DeliverySessionManager(val delivery_buffer:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
var sessions = List[SessionServer]()
@@ -494,7 +494,7 @@ class DeliverySessionManager(val deliver
def drain_source = {
val deliveries = source.getData
deliveries.foreach { delivery=>
- delivery_buffer.send(delivery)
+ sink.send(delivery)
delivery.release
}
}
@@ -514,7 +514,7 @@ class DeliverySessionManager(val deliver
val client = new SessionClient()
- class SessionClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+ class SessionClient() extends DeliveryOverflowBuffer(sink) {
producer_queue.retain
val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:46:01 2010
@@ -55,6 +55,7 @@ abstract class RemoteConsumer extends Co
override def onTransportConnected() = {
setupSubscription();
+ transport.resumeRead
}
override def onTransportFailure(error: IOException) = {
@@ -116,6 +117,7 @@ abstract class RemoteProducer extends Co
override def onTransportConnected() = {
setupProducer();
+ transport.resumeRead
}
def setupProducer()
@@ -254,7 +256,7 @@ abstract class BaseBrokerPerfTest {
consumerCount = 1;
createConnections();
- producers.get(0).thinkTime = 500000*1000;
+// producers.get(0).thinkTime = 500000*1000;
// Start 'em up.
startClients();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:46:01 2010
@@ -35,45 +35,8 @@ object StompFrameConstants {
import StompFrameConstants._
import StompConstants._;
import BufferConversions._
-
-/**
- * Represents all the data in a STOMP frame.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA) extends Message {
-
- def headerSize = {
- if( headers.isEmpty ) {
- 0
- } else {
- // if all the headers were part of the same input buffer.. size can be calculated by
- // subtracting positions in the buffer.
- val firstBuffer = headers.head._1
- val lastBuffer = headers.last._2
- if( firstBuffer.data eq lastBuffer.data ) {
- (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
- } else {
- // gota do it the hard way
- var rc = 0;
- val i = headers.iterator
- while( i.hasNext ) {
- val (key, value) = i.next
- rc += (key.length + value.length+2)
- }
- rc
- }
- }
- }
- def size = {
- if( action.data eq content.data ) {
- (content.offset-action.offset)+content.length
- } else {
- action.length + 1 +
- headerSize + 1 + content.length
- }
- }
+case class StompFrameMessage(frame:StompFrame) extends Message {
/**
* the globally unique id of the message
@@ -133,7 +96,7 @@ case class StompFrame(action:AsciiBuffer
}
}
- for( header <- headers ) {
+ for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
header match {
case (Stomp.Headers.Message.MESSAGE_ID, value) =>
id = value
@@ -147,3 +110,59 @@ case class StompFrame(action:AsciiBuffer
}
}
}
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA, updated_headers:HeaderMap=Nil) {
+
+ def size_of_updated_headers = {
+ size_of(updated_headers)
+ }
+
+ def size_of_original_headers = {
+ if( headers.isEmpty ) {
+ 0
+ } else {
+ // if all the headers were part of the same input buffer.. size can be calculated by
+ // subtracting positions in the buffer.
+ val firstBuffer = headers.head._1
+ val lastBuffer = headers.last._2
+ if( firstBuffer.data eq lastBuffer.data ) {
+ (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
+ } else {
+ // gota do it the hard way
+ size_of(headers)
+ }
+ }
+ }
+
+ private def size_of(headers:HeaderMap): Int = {
+ var rc = 0;
+ val i = headers.iterator
+ while (i.hasNext) {
+ val (key, value) = i.next
+ rc += (key.length + value.length + 2)
+ }
+ rc
+ }
+
+ def size = {
+ if( (action.data eq content.data) && updated_headers==Nil ) {
+ (content.offset-action.offset)+content.length
+ } else {
+ action.length + 1 +
+ size_of_updated_headers +
+ size_of_original_headers + 1 + content.length
+ }
+ }
+
+ def header(name:AsciiBuffer) = {
+ updated_headers.filter( _._1 == name ).headOption.orElse(
+ headers.filter( _._1 == name ).headOption
+ ).map(_._2).getOrElse(null)
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:46:01 2010
@@ -99,7 +99,9 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatchQueue
override def onTransportConnected() = {
- outboundChannel = new TransportDeliverySink(connection.transport)
+ outboundChannel = new TransportDeliverySink(connection.transport) {
+ override def send(delivery: Delivery) = transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+ }
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
@@ -128,23 +130,23 @@ class StompProtocolHandler extends Proto
def onTransportCommand(command:Any) = {
try {
command match {
- case StompFrame(Commands.SEND, headers, content) =>
+ case StompFrame(Commands.SEND, headers, content, _) =>
on_stomp_send(command.asInstanceOf[StompFrame])
- case StompFrame(Commands.ACK, headers, content) =>
+ case StompFrame(Commands.ACK, headers, content, _) =>
// TODO:
- case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+ case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
info("got command: %s", command)
on_stomp_subscribe(headers)
- case StompFrame(Commands.CONNECT, headers, _) =>
+ case StompFrame(Commands.CONNECT, headers, _, _) =>
info("got command: %s", command)
on_stomp_connect(headers)
- case StompFrame(Commands.DISCONNECT, headers, content) =>
+ case StompFrame(Commands.DISCONNECT, headers, content, _t) =>
info("got command: %s", command)
connection.stop
case s:StompWireFormat =>
// this is passed on to us by the protocol discriminator
// so we know which wire format is being used.
- case StompFrame(unknown, _, _) =>
+ case StompFrame(unknown, _, _, _) =>
die("Unsupported STOMP command: "+unknown);
case _ =>
die("Unsupported command: "+command);
@@ -214,9 +216,26 @@ class StompProtocolHandler extends Proto
}
}
+ var message_id_counter = 0;
+ def next_message_id = {
+ message_id_counter += 1
+ // TODO: properly generate mesage ids
+ new AsciiBuffer("msg:"+message_id_counter);
+ }
+
def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
if( !route.targets.isEmpty ) {
- val delivery = Delivery(frame, frame.size)
+
+ // We may need to add some headers..
+ var message = if( frame.header(Stomp.Headers.Message.MESSAGE_ID)==null ) {
+ var updated_headers:HeaderMap=Nil;
+ updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
+ StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
+ } else {
+ StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
+ }
+
+ val delivery = Delivery(message, message.frame.size)
connection.transport.suspendRead
delivery.setDisposer(^{
connection.transport.resumeRead
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:46:01 2010
@@ -69,10 +69,10 @@ class StompRemoteConsumer extends Remote
def onTransportCommand(command:Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.CONNECTED, headers, _) =>
- case StompFrame(Responses.MESSAGE, headers, content) =>
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.MESSAGE, headers, content, _) =>
messageReceived();
- case StompFrame(Responses.ERROR, headers, content) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -137,14 +137,14 @@ class StompRemoteProducer extends Remote
stompDestination = ascii("/topic/"+destination.getName().toString());
}
transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
-
+ send_next
}
def onTransportCommand(command:Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
- case StompFrame(Responses.CONNECTED, headers, _) =>
- case StompFrame(Responses.ERROR, headers, content) =>
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content.utf8));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));