You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/14 20:47:56 UTC

svn commit: r1373023 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-stomp/src/main/scala/org/apache/act...

Author: chirino
Date: Tue Aug 14 18:47:56 2012
New Revision: 1373023

URL: http://svn.apache.org/viewvc?rev=1373023&view=rev
Log:
Implements APLO-242 - Support STOMP frames over UDP

Added:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Tue Aug 14 18:47:56 2012
@@ -16,18 +16,19 @@
  */
 package org.apache.activemq.apollo.broker.protocol
 
-import org.apache.activemq.apollo.broker.store.MessageRecord
 import org.fusesource.hawtdispatch.transport.ProtocolCodec
 import java.nio.ByteBuffer
 import org.fusesource.hawtdispatch._
 import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
 import java.net.SocketAddress
-import org.apache.activemq.apollo.dto.{UdpDTO, AcceptingConnectorDTO}
+import org.apache.activemq.apollo.dto.{ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 import java.util.Map.Entry
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.broker.security.SecurityContext
+import scala.Some
+import org.apache.activemq.apollo.broker.BlackHoleSink
 
 
 case class UdpMessage(from:SocketAddress, buffer:ByteBuffer)
@@ -108,6 +109,9 @@ trait DecodedUdpMessage {
    *         null if you want to bypass authentication and authorization.
    */
   def security_context:SecurityContext
+
+  def size = message.buffer.remaining()
+
 }
 
 /**
@@ -115,13 +119,12 @@ trait DecodedUdpMessage {
  */
 abstract class UdpProtocolHandler extends ProtocolHandler {
   import UdpProtocolHandler._
-
+  type ConfigTypeDTO <: ProtocolDTO
   def protocol = "udp"
-  def session_id = None
+  var session_id:Option[String] = None
 
-  var buffer_size = 64*1024
+  var buffer_size = 640*1024
   var connection_log:Log = _
-  var config:UdpDTO = _
   var messages_received = 0L
   var waiting_on = "client request"
 
@@ -135,23 +138,21 @@ abstract class UdpProtocolHandler extend
     rc
   }
 
-  def configure(config:UdpDTO) = {
-    this.config = config
-    buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
-  }
+  def configure(config:Option[ConfigTypeDTO]) = {}
 
   override def on_transport_connected = {
     connection.transport.resumeRead
     import collection.JavaConversions._
+    session_id = Some("%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet))
 
-    configure((connection.connector.config match {
+    configure(connection.connector.config match {
       case connector_config:AcceptingConnectorDTO =>
         connector_config.protocols.flatMap{ _ match {
-          case x:UdpDTO => Some(x)
+          case x:ConfigTypeDTO => Some(x)
           case _ => None
         }}.headOption
       case _ => None
-    }).getOrElse(new UdpDTO) )
+    })
   }
 
   def suspend_read(reason: String) = {
@@ -175,7 +176,7 @@ abstract class UdpProtocolHandler extend
     }
   }
 
-  override def on_transport_command(command: AnyRef) = {
+  override def on_transport_command(command: AnyRef):Unit = {
     decode(command.asInstanceOf[UdpMessage]) match {
       case Some(msg) =>
         messages_received += 1
@@ -188,7 +189,14 @@ abstract class UdpProtocolHandler extend
         var sc_key = if( security_context!=null) security_context.to_key else null
         var route = producerRoutes.get((host, address, sc_key));
         if( route == null ) {
-          route = new UdpProducerRoute(host, address)
+          try {
+            route = new UdpProducerRoute(host, address)
+          } catch {
+            case e =>
+              // We could run into a error like the address not parsing
+              debug(e, "Could not create the producer route")
+              return
+          }
           producerRoutes.put((host, address, sc_key), route)
 
           def fail_connect = {
@@ -199,6 +207,7 @@ abstract class UdpProtocolHandler extend
           def continue_connect = host.dispatch_queue {
             host.router.connect(route.addresses, route, security_context) match {
               case Some(error) => queue {
+                debug("Could not connect the producer route: "+error)
                 fail_connect
               }
               case None =>
@@ -212,7 +221,9 @@ abstract class UdpProtocolHandler extend
                 resume_read
                 auth_failure match {
                   case null=> continue_connect
-                  case auth_failure=> fail_connect
+                  case auth_failure=>
+                    debug("Producer route failed authentication: "+auth_failure)
+                    fail_connect
                 }
               }
             }
@@ -239,8 +250,8 @@ abstract class UdpProtocolHandler extend
 
     val sink_switch = new MutableSink[Delivery]()
 
-    val inbound_queue = new OverflowSink[Delivery](sink_switch) {
-      override protected def onDelivered(value: Delivery) = {
+    val inbound_queue = new OverflowSink[DecodedUdpMessage](sink_switch.map(_.delivery)) {
+      override protected def onDelivered(value: DecodedUdpMessage) = {
         inbound_queue_size -= value.size
       }
     }
@@ -255,9 +266,8 @@ abstract class UdpProtocolHandler extend
         inbound_queue.removeFirst
       }
       
-      val delivery = frame.delivery
-      inbound_queue_size += delivery.size
-      inbound_queue.offer(delivery)
+      inbound_queue_size += frame.size
+      inbound_queue.offer(frame)
     }
   }
 
@@ -278,12 +288,14 @@ class UdpProtocol extends BaseProtocol {
   def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
 
   def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
+    type ConfigTypeDTO = UdpDTO
 
     var default_host:VirtualHost = _
     var topic_address:AsciiBuffer = _
 
-    override def configure(config: UdpDTO) {
-      super.configure(config)
+    override def configure(c: Option[ConfigTypeDTO]) = {
+      val config = c.getOrElse(new ConfigTypeDTO)
+      buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
       val topic_address_decoded = decode_address(Option(config.topic).getOrElse("udp"))
       topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
       default_host = broker.default_virtual_host

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:56 2012
@@ -14,4 +14,5 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
+org.apache.activemq.apollo.stomp.StompUdpProtocol
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 14 18:47:56 2012
@@ -26,7 +26,7 @@ import java.io.{DataOutput, IOException}
 import org.fusesource.hawtdispatch.transport._
 import _root_.org.fusesource.hawtbuf._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.apache.activemq.apollo.broker.store.{DirectBuffer, MessageRecord}
 import java.lang.ThreadLocal
 import java.util.ArrayList
 import collection.mutable.{ListBuffer, HashMap}
@@ -88,8 +88,10 @@ object StompCodec extends Log {
   }
 
   def decode(message: MessageRecord):StompFrameMessage = {
+    new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer))
+  }
 
-    val buffer = message.buffer.buffer
+  def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null):StompFrame = {
     def read_line = {
       val pos = buffer.indexOf('\n'.toByte)
       if( pos<0 ) {
@@ -124,10 +126,10 @@ object StompCodec extends Log {
       line = read_line
     }
 
-    if( message.direct_buffer==null ) {
-      new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
+    if( direct_buffer==null ) {
+      new StompFrame(action, headers.toList, BufferContent(buffer))
     } else {
-      new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.direct_buffer)))
+      new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer))
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Aug 14 18:47:56 2012
@@ -74,18 +74,20 @@ object StompProtocolHandler extends Log 
     def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
   }
 
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class StompProtocolHandler extends ProtocolHandler {
-  import StompProtocolHandler._
-
-  var connection_log:Log = StompProtocolHandler
+  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+    val i = headers.iterator
+    while( i.hasNext ) {
+      val entry = i.next
+      if( entry._1 == name ) {
+        return Some(entry._2)
+      }
+    }
+    None
+  }
 
-  def protocol = "stomp"
-  def broker = connection.connector.broker
+  def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
+    names.map(x=>get(headers, x))
+  }
 
   def decode_header(value:Buffer):String = {
     var rc = new ByteArrayOutputStream(value.length)
@@ -109,7 +111,7 @@ class StompProtocolHandler extends Proto
     new String(rc.toByteArray, "UTF-8")
   }
 
-  def encode_header(value:String) = {
+  def encode_header(value:String, protocol_version:AsciiBuffer=V1_1):AsciiBuffer = {
     protocol_version match {
       case null => utf8(value).ascii
       case V1_0 => utf8(value).ascii
@@ -128,6 +130,21 @@ class StompProtocolHandler extends Proto
     }
   }
 
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompProtocolHandler extends ProtocolHandler {
+  import StompProtocolHandler._
+
+  var connection_log:Log = StompProtocolHandler
+
+  def encode_header(value:String):AsciiBuffer = StompProtocolHandler.encode_header(value, protocol_version)
+
+  def protocol = "stomp"
+  def broker = connection.connector.broker
+
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
   
   def id(message:Message) = {
@@ -1033,21 +1050,6 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
-    names.map(x=>get(headers, x))
-  }
-
-  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
-    val i = headers.iterator
-    while( i.hasNext ) {
-      val entry = i.next
-      if( entry._1 == name ) {
-        return Some(entry._2)
-      }
-    }
-    None
-  }
-
   def on_stomp_send(frame:StompFrame) = {
     messages_received += 1
 
@@ -1119,7 +1121,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  var message_id_counter = 0;
+  var message_id_counter = 0L
 
   def encode_address(value: Array[_ <: DestinationAddress]): String = {
     destination_parser.encode_destination(value)

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1373023&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Tue Aug 14 18:47:56 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import org.apache.activemq.apollo.broker.protocol._
+import org.apache.activemq.apollo.broker._
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.stomp.Stomp._
+import org.apache.activemq.apollo.stomp.dto.StompDTO
+import org.fusesource.hawtbuf.Buffer._
+import scala.Some
+import org.apache.activemq.apollo.broker.protocol.UdpMessage
+import org.apache.activemq.apollo.broker.security.SecurityContext
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompUdpProtocol extends UdpProtocol {
+
+  override def id = "stomp-udp"
+
+  override def createProtocolHandler = new UdpProtocolHandler {
+
+    type ConfigTypeDTO = StompDTO
+
+    var config:ConfigTypeDTO = _
+    var destination_parser = Stomp.destination_parser
+    var protocol_filters = List[ProtocolFilter2]()
+    var message_id_counter = 0L
+    var default_virtual_host:VirtualHost = _
+
+    override def configure(c: Option[ConfigTypeDTO]) = {
+      config = c.getOrElse(new ConfigTypeDTO)
+      import collection.JavaConversions._
+      default_virtual_host = broker.default_virtual_host
+      protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+
+
+//      Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
+//      Option(config.max_header_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_header_length = _ )
+//      config.max_headers.foreach( codec.max_headers = _ )
+
+      if( config.queue_prefix!=null ||
+          config.topic_prefix!=null ||
+          config.destination_separator!=null ||
+          config.path_separator!= null ||
+          config.any_child_wildcard != null ||
+          config.any_descendant_wildcard!= null ||
+          config.regex_wildcard_start!= null ||
+          config.regex_wildcard_end!= null
+      ) {
+
+        destination_parser = new DestinationParser().copy(Stomp.destination_parser)
+        if( config.queue_prefix!=null ) { destination_parser.queue_prefix = config.queue_prefix }
+        if( config.topic_prefix!=null ) { destination_parser.topic_prefix = config.topic_prefix }
+        if( config.temp_queue_prefix!=null ) { destination_parser.temp_queue_prefix = config.temp_queue_prefix }
+        if( config.temp_topic_prefix!=null ) { destination_parser.temp_topic_prefix = config.temp_topic_prefix }
+        if( config.destination_separator!=null ) { destination_parser.destination_separator = config.destination_separator }
+        if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator }
+        if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard }
+        if( config.any_descendant_wildcard!=null ) { destination_parser.any_descendant_wildcard = config.any_descendant_wildcard }
+        if( config.regex_wildcard_start!=null ) { destination_parser.regex_wildcard_start = config.regex_wildcard_start }
+        if( config.regex_wildcard_end!=null ) { destination_parser.regex_wildcard_end = config.regex_wildcard_end }
+
+      }
+
+    }
+
+    def decode_address(dest:String):Array[SimpleAddress] = {
+      val rc = destination_parser.decode_multi_destination(dest.toString)
+      if( rc==null ) {
+        throw new ProtocolException("Invalid stomp destination name: "+dest);
+      }
+      rc
+    }
+
+    def build_security_context(udp: UdpMessage, frame:StompFrame):(SecurityContext,StompFrame) = {
+      import StompProtocolHandler._
+      var headers = frame.headers
+      val login = get(headers, LOGIN)
+      val passcode = get(headers, PASSCODE)
+      if( login.isDefined || passcode.isDefined ) {
+        val sc = new SecurityContext
+        sc.connector_id = connection.connector.id
+        sc.local_address = connection.transport.getLocalAddress
+        sc.session_id = session_id
+        for( value <- login ) {
+          sc.user = value.toString
+          headers = headers.filterNot( _._1 == LOGIN)
+        }
+        for( value <- passcode ) {
+          sc.password = value.toString
+          headers = headers.filterNot( _._1 == PASSCODE)
+        }
+        (sc, frame.copy(headers=headers))
+      } else {
+        (null, frame)
+      }
+    }
+
+    def decode(udp: UdpMessage):Option[DecodedUdpMessage] = {
+      import StompProtocolHandler._
+
+      try {
+        var frame = StompCodec.decode_frame(new Buffer(udp.buffer))
+        frame = if(!protocol_filters.isEmpty) {
+          var cur = Option(frame)
+          protocol_filters.foreach { filter =>
+            cur = cur.flatMap(filter.filter_inbound(_))
+          }
+          cur match {
+            case Some(frame) => frame
+            case None => return None
+          }
+        } else {
+          frame
+        }
+
+        val virtual_host = get(frame.headers, HOST) match {
+          case Some(host) => broker.cow_virtual_hosts_by_hostname.get(host).getOrElse(default_virtual_host)
+          case None => default_virtual_host
+        }
+
+        val (sc, updated_frame) = build_security_context(udp, frame)
+        frame = updated_frame
+        val dest = get(frame.headers, DESTINATION).get.deepCopy().ascii()
+
+        Some(new DecodedUdpMessage {
+          def message = udp
+          def host = virtual_host
+          def security_context = sc
+          def address = dest
+
+          def delivery = {
+
+            // Apply header updates...
+            val updated_frame = updated_headers(frame.headers, security_context) match {
+              case Nil=> frame.copy(action=MESSAGE)
+              case updated_headers => frame.copy(action=MESSAGE, updated_headers=updated_headers)
+            }
+
+            var message: StompFrameMessage = new StompFrameMessage(updated_frame)
+            val delivery = new Delivery
+            delivery.size = updated_frame.size
+            delivery.message = message
+            delivery.expiration = message.expiration
+            delivery.persistent = message.persistent
+            get(updated_frame.headers, RETAIN).foreach { retain =>
+              delivery.retain = retain match {
+                case SET => RetainSet
+                case REMOVE => RetainRemove
+                case _ => RetainIgnore
+              }
+            }
+            delivery
+          }
+        })
+
+      } catch {
+        case e => None
+      }
+
+    }
+
+    def updated_headers(headers:HeaderMap, security_context:SecurityContext) = {
+      import StompProtocolHandler._
+      import collection.JavaConversions._
+
+      var rc:HeaderMap=Nil
+      val host = default_virtual_host
+
+      // Do we need to add the message id?
+      if( get( headers, MESSAGE_ID) == None ) {
+        message_id_counter += 1
+        rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+      }
+
+      if( config.add_timestamp_header!=null ) {
+        rc ::= (encode_header(config.add_timestamp_header), ascii(broker.now.toString()))
+      }
+
+      // Do we need to add the user id?
+      if( host.authenticator!=null ) {
+        if( config.add_user_header!=null ) {
+          host.authenticator.user_name(security_context).foreach{ name=>
+            rc ::= (encode_header(config.add_user_header), encode_header(name))
+          }
+        }
+        if( !config.add_user_headers.isEmpty ){
+          config.add_user_headers.foreach { h =>
+            val matches = security_context.principals(Option(h.kind).getOrElse("*"))
+            if( !matches.isEmpty ) {
+              h.separator match {
+                case null=>
+                  rc ::= (encode_header(h.name.trim), encode_header(matches.head.getName))
+                case separator =>
+                  rc ::= (encode_header(h.name.trim), encode_header(matches.map(_.getName).mkString(separator)))
+              }
+            }
+          }
+        }
+      }
+
+      rc
+    }
+
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Tue Aug 14 18:47:56 2012
@@ -43,5 +43,6 @@
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+  <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Tue Aug 14 18:47:56 2012
@@ -43,5 +43,6 @@
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+  <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Tue Aug 14 18:47:56 2012
@@ -37,5 +37,6 @@
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+  <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Aug 14 18:47:56 2012
@@ -169,6 +169,27 @@ class StompParallelTest extends StompTes
     assert_received("Hello")
   }
 
+  test("STOMP UDP to STOMP interop") {
+
+    connect("1.1")
+    subscribe("0", "/topic/some-other-udp")
+
+    val udp_port: Int = connector_port("stomp-udp").get
+    val channel = DatagramChannel.open();
+    println("The UDP port is: "+udp_port)
+
+    val target = new InetSocketAddress("127.0.0.1", udp_port)
+    channel.send(new AsciiBuffer(
+      "SEND\n" +
+      "destination:/topic/some-other-udp\n" +
+      "login:admin\n" +
+      "passcode:password\n" +
+      "\n" +
+      "Hello STOMP-UDP").toByteBuffer, target)
+
+    assert_received("Hello STOMP-UDP")
+  }
+
   /**
    * These disconnect tests assure that we don't drop message deliviers that are in flight
    * if a client disconnects before those deliveries are accepted by the target destination.