You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:01 UTC

svn commit: r961081 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-stomp/src/main/scala/org/apache/activemq...

Author: chirino
Date: Wed Jul  7 03:46:01 2010
New Revision: 961081

URL: http://svn.apache.org/viewvc?rev=961081&view=rev
Log:
you can now add header updates to a stomp frame

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:46:01 2010
@@ -390,7 +390,7 @@ trait DeliverySink {
   def send(delivery:Delivery):Unit
 }
 
-class TransportDeliverySink(val transport:Transport) extends DeliverySink {
+class TransportDeliverySink(var transport:Transport) extends DeliverySink {
   def full:Boolean = transport.isFull
   def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
 }
@@ -472,7 +472,7 @@ class DeliveryOverflowBuffer(val deliver
 
 }
 
-class DeliverySessionManager(val delivery_buffer:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
 
   var sessions = List[SessionServer]()
 
@@ -494,7 +494,7 @@ class DeliverySessionManager(val deliver
   def drain_source = {
     val deliveries = source.getData
     deliveries.foreach { delivery=>
-      delivery_buffer.send(delivery)
+      sink.send(delivery)
       delivery.release
     }
   }
@@ -514,7 +514,7 @@ class DeliverySessionManager(val deliver
 
     val client = new SessionClient()
 
-    class SessionClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+    class SessionClient() extends DeliveryOverflowBuffer(sink) {
 
       producer_queue.retain
       val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:46:01 2010
@@ -55,6 +55,7 @@ abstract class RemoteConsumer extends Co
 
   override def onTransportConnected() = {
     setupSubscription();
+    transport.resumeRead
   }
 
   override def onTransportFailure(error: IOException) = {
@@ -116,6 +117,7 @@ abstract class RemoteProducer extends Co
 
   override def onTransportConnected() = {
     setupProducer();
+    transport.resumeRead
   }
 
   def setupProducer()
@@ -254,7 +256,7 @@ abstract class BaseBrokerPerfTest {
     consumerCount = 1;
 
     createConnections();
-    producers.get(0).thinkTime = 500000*1000;
+//    producers.get(0).thinkTime = 500000*1000;
 
     // Start 'em up.
     startClients();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:46:01 2010
@@ -35,45 +35,8 @@ object StompFrameConstants {
 import StompFrameConstants._
 import StompConstants._;
 import BufferConversions._
-  
-/**
- * Represents all the data in a STOMP frame.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA) extends Message {
-
-  def headerSize = {
-    if( headers.isEmpty ) {
-      0
-    } else {
-      // if all the headers were part of the same input buffer.. size can be calculated by
-      // subtracting positions in the buffer.
-      val firstBuffer = headers.head._1
-      val lastBuffer =  headers.last._2
-      if( firstBuffer.data eq lastBuffer.data ) {
-        (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
-      } else {
-        // gota do it the hard way
-        var rc = 0;
-        val i = headers.iterator
-        while( i.hasNext ) {
-          val (key, value) = i.next
-          rc += (key.length + value.length+2)
-        }
-        rc
-      }
-    }
-  }
 
-  def size = {
-     if( action.data eq content.data ) {
-        (content.offset-action.offset)+content.length
-     } else {
-       action.length + 1 +
-       headerSize + 1 + content.length
-     }
-  }
+case class StompFrameMessage(frame:StompFrame) extends Message {
 
   /**
    * the globally unique id of the message
@@ -133,7 +96,7 @@ case class StompFrame(action:AsciiBuffer
     }
   }
 
-  for( header <- headers ) {
+  for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
     header match {
       case (Stomp.Headers.Message.MESSAGE_ID, value) =>
         id = value
@@ -147,3 +110,59 @@ case class StompFrame(action:AsciiBuffer
     }
   }
 }
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA, updated_headers:HeaderMap=Nil) {
+
+  def size_of_updated_headers = {
+    size_of(updated_headers)
+  }
+
+  def size_of_original_headers = {
+    if( headers.isEmpty ) {
+      0
+    } else {
+      // if all the headers were part of the same input buffer.. size can be calculated by
+      // subtracting positions in the buffer.
+      val firstBuffer = headers.head._1
+      val lastBuffer =  headers.last._2
+      if( firstBuffer.data eq lastBuffer.data ) {
+        (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
+      } else {
+        // gota do it the hard way
+        size_of(headers)
+      }
+    }
+  }
+
+  private def size_of(headers:HeaderMap): Int = {
+    var rc = 0;
+    val i = headers.iterator
+    while (i.hasNext) {
+      val (key, value) = i.next
+      rc += (key.length + value.length + 2)
+    }
+    rc
+  }
+
+  def size = {
+     if( (action.data eq content.data) && updated_headers==Nil ) {
+        (content.offset-action.offset)+content.length
+     } else {
+       action.length + 1 +
+       size_of_updated_headers +
+       size_of_original_headers + 1 + content.length
+     }
+  }
+
+  def header(name:AsciiBuffer) = {
+    updated_headers.filter( _._1 == name ).headOption.orElse(
+      headers.filter( _._1 == name ).headOption
+    ).map(_._2).getOrElse(null)
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:46:01 2010
@@ -99,7 +99,9 @@ class StompProtocolHandler extends Proto
   private def queue = connection.dispatchQueue
 
   override def onTransportConnected() = {
-    outboundChannel = new TransportDeliverySink(connection.transport)
+    outboundChannel = new TransportDeliverySink(connection.transport) {
+      override def send(delivery: Delivery) = transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+    }
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
@@ -128,23 +130,23 @@ class StompProtocolHandler extends Proto
   def onTransportCommand(command:Any) = {
     try {
       command match {
-        case StompFrame(Commands.SEND, headers, content) =>
+        case StompFrame(Commands.SEND, headers, content, _) =>
           on_stomp_send(command.asInstanceOf[StompFrame])
-        case StompFrame(Commands.ACK, headers, content) =>
+        case StompFrame(Commands.ACK, headers, content, _) =>
           // TODO:
-        case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+        case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
           info("got command: %s", command)
           on_stomp_subscribe(headers)
-        case StompFrame(Commands.CONNECT, headers, _) =>
+        case StompFrame(Commands.CONNECT, headers, _, _) =>
           info("got command: %s", command)
           on_stomp_connect(headers)
-        case StompFrame(Commands.DISCONNECT, headers, content) =>
+        case StompFrame(Commands.DISCONNECT, headers, content, _t) =>
           info("got command: %s", command)
           connection.stop
         case s:StompWireFormat =>
           // this is passed on to us by the protocol discriminator
           // so we know which wire format is being used.
-        case StompFrame(unknown, _, _) =>
+        case StompFrame(unknown, _, _, _) =>
           die("Unsupported STOMP command: "+unknown);
         case _ =>
           die("Unsupported command: "+command);
@@ -214,9 +216,26 @@ class StompProtocolHandler extends Proto
     }
   }
 
+  var message_id_counter = 0;
+  def next_message_id = {
+    message_id_counter += 1
+    // TODO: properly generate mesage ids
+    new AsciiBuffer("msg:"+message_id_counter);
+  }
+
   def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
     if( !route.targets.isEmpty ) {
-      val delivery = Delivery(frame, frame.size)
+
+      // We may need to add some headers..
+      var message = if( frame.header(Stomp.Headers.Message.MESSAGE_ID)==null ) {
+        var updated_headers:HeaderMap=Nil;
+        updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
+        StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers))
+      } else {
+        StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
+      }
+      
+      val delivery = Delivery(message, message.frame.size)
       connection.transport.suspendRead
       delivery.setDisposer(^{
         connection.transport.resumeRead

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961081&r1=961080&r2=961081&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:46:01 2010
@@ -69,10 +69,10 @@ class StompRemoteConsumer extends Remote
     def onTransportCommand(command:Object) = {
       var frame = command.asInstanceOf[StompFrame]
       frame match {
-        case StompFrame(Responses.CONNECTED, headers, _) =>
-        case StompFrame(Responses.MESSAGE, headers, content) =>
+        case StompFrame(Responses.CONNECTED, headers, _, _) =>
+        case StompFrame(Responses.MESSAGE, headers, content, _) =>
           messageReceived();
-        case StompFrame(Responses.ERROR, headers, content) =>
+        case StompFrame(Responses.ERROR, headers, content, _) =>
           onFailure(new Exception("Server reported an error: " + frame.content));
         case _ =>
           onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -137,14 +137,14 @@ class StompRemoteProducer extends Remote
           stompDestination = ascii("/topic/"+destination.getName().toString());
       }
       transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
-
+      send_next
     }
 
     def onTransportCommand(command:Object) = {
       var frame = command.asInstanceOf[StompFrame]
       frame match {
-        case StompFrame(Responses.CONNECTED, headers, _) =>
-        case StompFrame(Responses.ERROR, headers, content) =>
+        case StompFrame(Responses.CONNECTED, headers, _, _) =>
+        case StompFrame(Responses.ERROR, headers, content, _) =>
           onFailure(new Exception("Server reported an error: " + frame.content.utf8));
         case _ =>
           onFailure(new Exception("Unexpected stomp command: " + frame.action));