You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/14 20:47:56 UTC
svn commit: r1373023 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-stomp/src/main/scala/org/apache/act...
Author: chirino
Date: Tue Aug 14 18:47:56 2012
New Revision: 1373023
URL: http://svn.apache.org/viewvc?rev=1373023&view=rev
Log:
Implements APLO-242 - Support STOMP frames over UDP
Added:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Tue Aug 14 18:47:56 2012
@@ -16,18 +16,19 @@
*/
package org.apache.activemq.apollo.broker.protocol
-import org.apache.activemq.apollo.broker.store.MessageRecord
import org.fusesource.hawtdispatch.transport.ProtocolCodec
import java.nio.ByteBuffer
import org.fusesource.hawtdispatch._
import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
import java.net.SocketAddress
-import org.apache.activemq.apollo.dto.{UdpDTO, AcceptingConnectorDTO}
+import org.apache.activemq.apollo.dto.{ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
import java.util.Map.Entry
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.broker.security.SecurityContext
+import scala.Some
+import org.apache.activemq.apollo.broker.BlackHoleSink
case class UdpMessage(from:SocketAddress, buffer:ByteBuffer)
@@ -108,6 +109,9 @@ trait DecodedUdpMessage {
* null if you want to bypass authentication and authorization.
*/
def security_context:SecurityContext
+
+ def size = message.buffer.remaining()
+
}
/**
@@ -115,13 +119,12 @@ trait DecodedUdpMessage {
*/
abstract class UdpProtocolHandler extends ProtocolHandler {
import UdpProtocolHandler._
-
+ type ConfigTypeDTO <: ProtocolDTO
def protocol = "udp"
- def session_id = None
+ var session_id:Option[String] = None
- var buffer_size = 64*1024
+ var buffer_size = 640*1024
var connection_log:Log = _
- var config:UdpDTO = _
var messages_received = 0L
var waiting_on = "client request"
@@ -135,23 +138,21 @@ abstract class UdpProtocolHandler extend
rc
}
- def configure(config:UdpDTO) = {
- this.config = config
- buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
- }
+ def configure(config:Option[ConfigTypeDTO]) = {}
override def on_transport_connected = {
connection.transport.resumeRead
import collection.JavaConversions._
+ session_id = Some("%s-%x".format(broker.default_virtual_host.config.id, broker.default_virtual_host.session_counter.incrementAndGet))
- configure((connection.connector.config match {
+ configure(connection.connector.config match {
case connector_config:AcceptingConnectorDTO =>
connector_config.protocols.flatMap{ _ match {
- case x:UdpDTO => Some(x)
+ case x:ConfigTypeDTO => Some(x)
case _ => None
}}.headOption
case _ => None
- }).getOrElse(new UdpDTO) )
+ })
}
def suspend_read(reason: String) = {
@@ -175,7 +176,7 @@ abstract class UdpProtocolHandler extend
}
}
- override def on_transport_command(command: AnyRef) = {
+ override def on_transport_command(command: AnyRef):Unit = {
decode(command.asInstanceOf[UdpMessage]) match {
case Some(msg) =>
messages_received += 1
@@ -188,7 +189,14 @@ abstract class UdpProtocolHandler extend
var sc_key = if( security_context!=null) security_context.to_key else null
var route = producerRoutes.get((host, address, sc_key));
if( route == null ) {
- route = new UdpProducerRoute(host, address)
+ try {
+ route = new UdpProducerRoute(host, address)
+ } catch {
+ case e =>
+ // We could run into a error like the address not parsing
+ debug(e, "Could not create the producer route")
+ return
+ }
producerRoutes.put((host, address, sc_key), route)
def fail_connect = {
@@ -199,6 +207,7 @@ abstract class UdpProtocolHandler extend
def continue_connect = host.dispatch_queue {
host.router.connect(route.addresses, route, security_context) match {
case Some(error) => queue {
+ debug("Could not connect the producer route: "+error)
fail_connect
}
case None =>
@@ -212,7 +221,9 @@ abstract class UdpProtocolHandler extend
resume_read
auth_failure match {
case null=> continue_connect
- case auth_failure=> fail_connect
+ case auth_failure=>
+ debug("Producer route failed authentication: "+auth_failure)
+ fail_connect
}
}
}
@@ -239,8 +250,8 @@ abstract class UdpProtocolHandler extend
val sink_switch = new MutableSink[Delivery]()
- val inbound_queue = new OverflowSink[Delivery](sink_switch) {
- override protected def onDelivered(value: Delivery) = {
+ val inbound_queue = new OverflowSink[DecodedUdpMessage](sink_switch.map(_.delivery)) {
+ override protected def onDelivered(value: DecodedUdpMessage) = {
inbound_queue_size -= value.size
}
}
@@ -255,9 +266,8 @@ abstract class UdpProtocolHandler extend
inbound_queue.removeFirst
}
- val delivery = frame.delivery
- inbound_queue_size += delivery.size
- inbound_queue.offer(delivery)
+ inbound_queue_size += frame.size
+ inbound_queue.offer(frame)
}
}
@@ -278,12 +288,14 @@ class UdpProtocol extends BaseProtocol {
def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
+ type ConfigTypeDTO = UdpDTO
var default_host:VirtualHost = _
var topic_address:AsciiBuffer = _
- override def configure(config: UdpDTO) {
- super.configure(config)
+ override def configure(c: Option[ConfigTypeDTO]) = {
+ val config = c.getOrElse(new ConfigTypeDTO)
+ buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
val topic_address_decoded = decode_address(Option(config.topic).getOrElse("udp"))
topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
default_host = broker.default_virtual_host
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:56 2012
@@ -14,4 +14,5 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
+org.apache.activemq.apollo.stomp.StompUdpProtocol
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 14 18:47:56 2012
@@ -26,7 +26,7 @@ import java.io.{DataOutput, IOException}
import org.fusesource.hawtdispatch.transport._
import _root_.org.fusesource.hawtbuf._
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.apache.activemq.apollo.broker.store.{DirectBuffer, MessageRecord}
import java.lang.ThreadLocal
import java.util.ArrayList
import collection.mutable.{ListBuffer, HashMap}
@@ -88,8 +88,10 @@ object StompCodec extends Log {
}
def decode(message: MessageRecord):StompFrameMessage = {
+ new StompFrameMessage(decode_frame(message.buffer, message.direct_buffer))
+ }
- val buffer = message.buffer.buffer
+ def decode_frame(buffer: Buffer, direct_buffer:DirectBuffer=null):StompFrame = {
def read_line = {
val pos = buffer.indexOf('\n'.toByte)
if( pos<0 ) {
@@ -124,10 +126,10 @@ object StompCodec extends Log {
line = read_line
}
- if( message.direct_buffer==null ) {
- new StompFrameMessage(new StompFrame(action, headers.toList, BufferContent(buffer)))
+ if( direct_buffer==null ) {
+ new StompFrame(action, headers.toList, BufferContent(buffer))
} else {
- new StompFrameMessage(new StompFrame(action, headers.toList, ZeroCopyContent(message.direct_buffer)))
+ new StompFrame(action, headers.toList, ZeroCopyContent(direct_buffer))
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Aug 14 18:47:56 2012
@@ -74,18 +74,20 @@ object StompProtocolHandler extends Log
def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
}
-}
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class StompProtocolHandler extends ProtocolHandler {
- import StompProtocolHandler._
-
- var connection_log:Log = StompProtocolHandler
+ def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
+ }
+ }
+ None
+ }
- def protocol = "stomp"
- def broker = connection.connector.broker
+ def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
+ names.map(x=>get(headers, x))
+ }
def decode_header(value:Buffer):String = {
var rc = new ByteArrayOutputStream(value.length)
@@ -109,7 +111,7 @@ class StompProtocolHandler extends Proto
new String(rc.toByteArray, "UTF-8")
}
- def encode_header(value:String) = {
+ def encode_header(value:String, protocol_version:AsciiBuffer=V1_1):AsciiBuffer = {
protocol_version match {
case null => utf8(value).ascii
case V1_0 => utf8(value).ascii
@@ -128,6 +130,21 @@ class StompProtocolHandler extends Proto
}
}
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompProtocolHandler extends ProtocolHandler {
+ import StompProtocolHandler._
+
+ var connection_log:Log = StompProtocolHandler
+
+ def encode_header(value:String):AsciiBuffer = StompProtocolHandler.encode_header(value, protocol_version)
+
+ def protocol = "stomp"
+ def broker = connection.connector.broker
+
protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
def id(message:Message) = {
@@ -1033,21 +1050,6 @@ class StompProtocolHandler extends Proto
}
}
- def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
- names.map(x=>get(headers, x))
- }
-
- def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
- val i = headers.iterator
- while( i.hasNext ) {
- val entry = i.next
- if( entry._1 == name ) {
- return Some(entry._2)
- }
- }
- None
- }
-
def on_stomp_send(frame:StompFrame) = {
messages_received += 1
@@ -1119,7 +1121,7 @@ class StompProtocolHandler extends Proto
}
}
- var message_id_counter = 0;
+ var message_id_counter = 0L
def encode_address(value: Array[_ <: DestinationAddress]): String = {
destination_parser.encode_destination(value)
Added: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala?rev=1373023&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompUdpProtocol.scala Tue Aug 14 18:47:56 2012
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import org.apache.activemq.apollo.broker.protocol._
+import org.apache.activemq.apollo.broker._
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.stomp.Stomp._
+import org.apache.activemq.apollo.stomp.dto.StompDTO
+import org.fusesource.hawtbuf.Buffer._
+import scala.Some
+import org.apache.activemq.apollo.broker.protocol.UdpMessage
+import org.apache.activemq.apollo.broker.security.SecurityContext
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class StompUdpProtocol extends UdpProtocol {
+
+ override def id = "stomp-udp"
+
+ override def createProtocolHandler = new UdpProtocolHandler {
+
+ type ConfigTypeDTO = StompDTO
+
+ var config:ConfigTypeDTO = _
+ var destination_parser = Stomp.destination_parser
+ var protocol_filters = List[ProtocolFilter2]()
+ var message_id_counter = 0L
+ var default_virtual_host:VirtualHost = _
+
+ override def configure(c: Option[ConfigTypeDTO]) = {
+ config = c.getOrElse(new ConfigTypeDTO)
+ import collection.JavaConversions._
+ default_virtual_host = broker.default_virtual_host
+ protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+
+
+// Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length = _ )
+// Option(config.max_header_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_header_length = _ )
+// config.max_headers.foreach( codec.max_headers = _ )
+
+ if( config.queue_prefix!=null ||
+ config.topic_prefix!=null ||
+ config.destination_separator!=null ||
+ config.path_separator!= null ||
+ config.any_child_wildcard != null ||
+ config.any_descendant_wildcard!= null ||
+ config.regex_wildcard_start!= null ||
+ config.regex_wildcard_end!= null
+ ) {
+
+ destination_parser = new DestinationParser().copy(Stomp.destination_parser)
+ if( config.queue_prefix!=null ) { destination_parser.queue_prefix = config.queue_prefix }
+ if( config.topic_prefix!=null ) { destination_parser.topic_prefix = config.topic_prefix }
+ if( config.temp_queue_prefix!=null ) { destination_parser.temp_queue_prefix = config.temp_queue_prefix }
+ if( config.temp_topic_prefix!=null ) { destination_parser.temp_topic_prefix = config.temp_topic_prefix }
+ if( config.destination_separator!=null ) { destination_parser.destination_separator = config.destination_separator }
+ if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator }
+ if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard }
+ if( config.any_descendant_wildcard!=null ) { destination_parser.any_descendant_wildcard = config.any_descendant_wildcard }
+ if( config.regex_wildcard_start!=null ) { destination_parser.regex_wildcard_start = config.regex_wildcard_start }
+ if( config.regex_wildcard_end!=null ) { destination_parser.regex_wildcard_end = config.regex_wildcard_end }
+
+ }
+
+ }
+
+ def decode_address(dest:String):Array[SimpleAddress] = {
+ val rc = destination_parser.decode_multi_destination(dest.toString)
+ if( rc==null ) {
+ throw new ProtocolException("Invalid stomp destination name: "+dest);
+ }
+ rc
+ }
+
+ def build_security_context(udp: UdpMessage, frame:StompFrame):(SecurityContext,StompFrame) = {
+ import StompProtocolHandler._
+ var headers = frame.headers
+ val login = get(headers, LOGIN)
+ val passcode = get(headers, PASSCODE)
+ if( login.isDefined || passcode.isDefined ) {
+ val sc = new SecurityContext
+ sc.connector_id = connection.connector.id
+ sc.local_address = connection.transport.getLocalAddress
+ sc.session_id = session_id
+ for( value <- login ) {
+ sc.user = value.toString
+ headers = headers.filterNot( _._1 == LOGIN)
+ }
+ for( value <- passcode ) {
+ sc.password = value.toString
+ headers = headers.filterNot( _._1 == PASSCODE)
+ }
+ (sc, frame.copy(headers=headers))
+ } else {
+ (null, frame)
+ }
+ }
+
+ def decode(udp: UdpMessage):Option[DecodedUdpMessage] = {
+ import StompProtocolHandler._
+
+ try {
+ var frame = StompCodec.decode_frame(new Buffer(udp.buffer))
+ frame = if(!protocol_filters.isEmpty) {
+ var cur = Option(frame)
+ protocol_filters.foreach { filter =>
+ cur = cur.flatMap(filter.filter_inbound(_))
+ }
+ cur match {
+ case Some(frame) => frame
+ case None => return None
+ }
+ } else {
+ frame
+ }
+
+ val virtual_host = get(frame.headers, HOST) match {
+ case Some(host) => broker.cow_virtual_hosts_by_hostname.get(host).getOrElse(default_virtual_host)
+ case None => default_virtual_host
+ }
+
+ val (sc, updated_frame) = build_security_context(udp, frame)
+ frame = updated_frame
+ val dest = get(frame.headers, DESTINATION).get.deepCopy().ascii()
+
+ Some(new DecodedUdpMessage {
+ def message = udp
+ def host = virtual_host
+ def security_context = sc
+ def address = dest
+
+ def delivery = {
+
+ // Apply header updates...
+ val updated_frame = updated_headers(frame.headers, security_context) match {
+ case Nil=> frame.copy(action=MESSAGE)
+ case updated_headers => frame.copy(action=MESSAGE, updated_headers=updated_headers)
+ }
+
+ var message: StompFrameMessage = new StompFrameMessage(updated_frame)
+ val delivery = new Delivery
+ delivery.size = updated_frame.size
+ delivery.message = message
+ delivery.expiration = message.expiration
+ delivery.persistent = message.persistent
+ get(updated_frame.headers, RETAIN).foreach { retain =>
+ delivery.retain = retain match {
+ case SET => RetainSet
+ case REMOVE => RetainRemove
+ case _ => RetainIgnore
+ }
+ }
+ delivery
+ }
+ })
+
+ } catch {
+ case e => None
+ }
+
+ }
+
+ def updated_headers(headers:HeaderMap, security_context:SecurityContext) = {
+ import StompProtocolHandler._
+ import collection.JavaConversions._
+
+ var rc:HeaderMap=Nil
+ val host = default_virtual_host
+
+ // Do we need to add the message id?
+ if( get( headers, MESSAGE_ID) == None ) {
+ message_id_counter += 1
+ rc ::= (MESSAGE_ID -> ascii(session_id.get+message_id_counter))
+ }
+
+ if( config.add_timestamp_header!=null ) {
+ rc ::= (encode_header(config.add_timestamp_header), ascii(broker.now.toString()))
+ }
+
+ // Do we need to add the user id?
+ if( host.authenticator!=null ) {
+ if( config.add_user_header!=null ) {
+ host.authenticator.user_name(security_context).foreach{ name=>
+ rc ::= (encode_header(config.add_user_header), encode_header(name))
+ }
+ }
+ if( !config.add_user_headers.isEmpty ){
+ config.add_user_headers.foreach { h =>
+ val matches = security_context.principals(Option(h.kind).getOrElse("*"))
+ if( !matches.isEmpty ) {
+ h.separator match {
+ case null=>
+ rc ::= (encode_header(h.name.trim), encode_header(matches.head.getName))
+ case separator =>
+ rc ::= (encode_header(h.name.trim), encode_header(matches.map(_.getName).mkString(separator)))
+ }
+ }
+ }
+ }
+ }
+
+ rc
+ }
+
+ }
+}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Tue Aug 14 18:47:56 2012
@@ -43,5 +43,6 @@
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
<connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+ <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Tue Aug 14 18:47:56 2012
@@ -43,5 +43,6 @@
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
<connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+ <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Tue Aug 14 18:47:56 2012
@@ -37,5 +37,6 @@
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
<connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
+ <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1373023&r1=1373022&r2=1373023&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Aug 14 18:47:56 2012
@@ -169,6 +169,27 @@ class StompParallelTest extends StompTes
assert_received("Hello")
}
+ test("STOMP UDP to STOMP interop") {
+
+ connect("1.1")
+ subscribe("0", "/topic/some-other-udp")
+
+ val udp_port: Int = connector_port("stomp-udp").get
+ val channel = DatagramChannel.open();
+ println("The UDP port is: "+udp_port)
+
+ val target = new InetSocketAddress("127.0.0.1", udp_port)
+ channel.send(new AsciiBuffer(
+ "SEND\n" +
+ "destination:/topic/some-other-udp\n" +
+ "login:admin\n" +
+ "passcode:password\n" +
+ "\n" +
+ "Hello STOMP-UDP").toByteBuffer, target)
+
+ assert_received("Hello STOMP-UDP")
+ }
+
/**
* These disconnect tests assure that we don't drop message deliviers that are in flight
* if a client disconnects before those deliveries are accepted by the target destination.