You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/14 20:47:40 UTC

svn commit: r1373022 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/proto/ apollo-broker/src/main/res...

Author: chirino
Date: Tue Aug 14 18:47:38 2012
New Revision: 1373022

URL: http://svn.apache.org/viewvc?rev=1373022&view=rev
Log:
Decoupled protocols from message codecs.  Simplified creating a protocol.  Should be simpler to customize UDP protocol handling.  Added support for auth and virtual host host selection to UDP protocols.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
      - copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
      - copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
      - copied, changed from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpMessageCodecFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.amqp.AmqpProtocolCodecFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpProtocolCodec
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala Tue Aug 14 18:47:38 2012
@@ -36,7 +36,7 @@ object AmqpCodec extends Log {
     message match {
       case message:AMQPMessage =>
         val rc = new MessageRecord
-        rc.protocol = PROTOCOL_ID
+        rc.codec = PROTOCOL_ID
         rc.buffer = message.payload
         rc
       case _ => throw new RuntimeException("Invalid message type");

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Tue Aug 14 18:47:38 2012
@@ -18,8 +18,7 @@ package org.apache.activemq.apollo.amqp
 
 import _root_.org.fusesource.hawtbuf._
 import org.apache.activemq.apollo.broker._
-import java.lang.String
-import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
 import org.apache.activemq.apollo.broker.store._
 import AmqpCodec._
 import org.fusesource.amqp.codec.AMQPProtocolCodec
@@ -51,27 +50,24 @@ class AmqpProtocolCodecFactory extends P
   }
 }
 
-class AmqpProtocolFactory extends ProtocolFactory {
-
-  def create() = AmqpProtocol
-
-  def create(config: String) = if(config == PROTOCOL) {
-    AmqpProtocol
-  } else {
-    null
-  }
-
-}
-
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object AmqpProtocol extends AmqpProtocolCodecFactory with Protocol {
-
   def createProtocolHandler = new AmqpProtocolHandler
+}
+
+object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+  def create = Array[MessageCodec](AmqpMessageCodec)
+}
+
+  /**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object AmqpMessageCodec extends MessageCodec {
+  def id = PROTOCOL
   def encode(message: Message) = AmqpCodec.encode(message)
   def decode(message: MessageRecord) = AmqpCodec.decode(message)
-
 }
 
 

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -54,7 +54,7 @@ object AMQPMessage {
 
 case class AMQPMessage(payload:Buffer) extends org.apache.activemq.apollo.broker.Message {
   import AmqpProtocolHandler._
-  def protocol = AmqpProtocol
+  def codec = AmqpMessageCodec
 
   var _annotated:Envelope = _
   def annotated = {
@@ -518,7 +518,7 @@ class AmqpProtocolHandler extends Protoc
 //          frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
 //        }
 
-        var annotated = if( message.protocol eq AmqpProtocol ) {
+        var annotated = if( message.codec eq AmqpMessageCodec ) {
           val original = message.asInstanceOf[AMQPMessage].annotated
           var annotated = new Envelope
           annotated.setHeader(header)
@@ -530,8 +530,8 @@ class AmqpProtocolHandler extends Protoc
         } else {
           
           val (body, content_type) =  protocol_convert match{
-            case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
-            case _ => (message.encoded, "protocol/"+message.protocol.id())
+            case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+            case _ => (message.encoded, "protocol/"+message.codec.id)
           }
           
           val bare = new types.Message

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Tue Aug 14 18:47:38 2012
@@ -24,7 +24,7 @@ option java_multiple_files = true;
 
 message MessagePB {
   required int64 messageKey=1;
-  required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
+  required bytes codec = 2 [java_override_type = "AsciiBuffer"];
   optional int32 size = 3;
   optional bytes value = 4;
   optional sint64 expiration = 5;

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.RawMessageCodec
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.RawMessageCodecFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,6 +14,5 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
-org.apache.activemq.apollo.broker.protocol.UdpProtocolFactory
-org.apache.activemq.apollo.broker.protocol.RawProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.AnyProtocol
+org.apache.activemq.apollo.broker.protocol.UdpProtocol

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Tue Aug 14 18:47:38 2012
@@ -272,10 +272,18 @@ class Broker() extends BaseService with 
     rc
   })
 
+  @volatile
   var default_virtual_host: VirtualHost = null
   val virtual_hosts = LinkedHashMap[AsciiBuffer, VirtualHost]()
   val virtual_hosts_by_hostname = new LinkedHashMap[AsciiBuffer, VirtualHost]()
 
+  /**
+   * This is a copy of the virtual_hosts_by_hostname variable which
+   * can be accessed by any thread.
+   */
+  @volatile
+  var cow_virtual_hosts_by_hostname = Map[AsciiBuffer, VirtualHost]()
+
   val connectors = LinkedHashMap[String, Connector]()
   val connections = LinkedHashMap[Long, BrokerConnection]()
 
@@ -389,6 +397,7 @@ class Broker() extends BaseService with 
     )
     virtual_hosts.clear()
     virtual_hosts_by_hostname.clear()
+    cow_virtual_hosts_by_hostname = Map()
 
     Option(web_server).foreach(tracker.stop(_))
     web_server = null
@@ -541,6 +550,8 @@ class Broker() extends BaseService with 
       }
     }
 
+    cow_virtual_hosts_by_hostname = virtual_hosts_by_hostname.toMap
+
     // first defined host is the default virtual host
     config.virtual_hosts.headOption.map(x=>ascii(x.id)).foreach { id =>
       default_virtual_host = virtual_hosts.get(id).getOrElse(null)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Tue Aug 14 18:47:38 2012
@@ -17,7 +17,7 @@
 package org.apache.activemq.apollo.broker
 
 import org.fusesource.hawtdispatch._
-import protocol.{ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
 import org.fusesource.hawtdispatch.transport._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.OptionSupport._

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Aug 14 18:47:38 2012
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util.Log
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.dto.DestinationDTO
-import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodec, Protocol, ProtocolFactory}
 import scala.Array
 
 object DeliveryProducer extends Log
@@ -99,11 +99,11 @@ trait DeliverySession extends SessionSin
 trait Message extends Filterable with Retained {
 
   /**
-   * The protocol of the message
+   * The encoder/decoder of the message
    */
-  def protocol:Protocol
+  def codec:MessageCodec
 
-  def encoded:Buffer = protocol.encode(this).buffer
+  def encoded:Buffer = codec.encode(this).buffer
   
 }
 
@@ -243,7 +243,7 @@ class Delivery {
   }
 
   def createMessageRecord() = {
-    val record = message.protocol.encode(message)
+    val record = message.codec.encode(message)
     record.locator = storeLocator
     record
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Tue Aug 14 18:47:38 2012
@@ -17,7 +17,7 @@
 package org.apache.activemq.apollo.broker
 
 import org.fusesource.hawtdispatch._
-import protocol.ProtocolFactory
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, ProtocolFactory, Protocol}
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
@@ -732,7 +732,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swapping_in_size -= size
 
         val delivery = to_delivery
-        delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
+        delivery.message = MessageCodecFactory(messageRecord.codec.toString).get.decode(messageRecord)
 
         space += delivery
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Aug 14 18:47:38 2012
@@ -70,7 +70,12 @@ abstract class Sink[T] {
       }
     }
   }
+}
 
+case class BlackHoleSink[T]() extends Sink[T] {
+  var refiller:Task = null
+  def full = false
+  def offer(value: T) = true
 }
 
 trait SinkFilter[T] {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Tue Aug 14 18:47:38 2012
@@ -35,61 +35,20 @@ import transport.{Transport, TransportAw
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class AnyProtocolFactory extends ProtocolFactory {
-
-  def all_protocols: Array[Protocol] = ((ProtocolFactory.finder.singletons.map(_.create())).filter(_.isIdentifiable)).toArray
-
-  def create() = {
-    new AnyProtocol(()=>all_protocols)
-  }
-
-  def create(config: String): Protocol = {
-    val MULTI = "any"
-    val MULTI_PREFIXED = "any:"
-
-    if (config == MULTI) {
-      return new AnyProtocol(()=>all_protocols)
-    } else if (config.startsWith(MULTI_PREFIXED)) {
-      var names: Array[String] = config.substring(MULTI_PREFIXED.length).split(',')
-      var protocols: Array[Protocol] = (names.flatMap {x => ProtocolFactory.get(x.trim)}).toArray
-      return new AnyProtocol(()=>protocols)
-    }
-    return null
-  }
-
-}
-
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class AnyProtocol(val func: ()=>Array[Protocol]) extends Protocol {
-
-  lazy val protocols: Array[Protocol] = func()
+class AnyProtocol() extends BaseProtocol {
 
   def id = "any"
 
-  def createProtocolCodec = new AnyProtocolCodec(protocols)
+  def createProtocolCodec = new AnyProtocolCodec()
 
   def createProtocolHandler = new AnyProtocolHandler
-
-  def encode(message: Message) = throw new UnsupportedOperationException
-
-  def decode(message: MessageRecord) = throw new UnsupportedOperationException
-
-  def isIdentifiable = false
-
-  def maxIdentificaionLength = throw new UnsupportedOperationException()
-
-  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
-
 }
 
 case class ProtocolDetected(id:String, codec:ProtocolCodec)
 
-class AnyProtocolCodec(val protocols: Array[Protocol]) extends ProtocolCodec with TransportAware {
+class AnyProtocolCodec() extends ProtocolCodec with TransportAware {
+
+  var protocols =  ProtocolFactory.protocols.filter(_.isIdentifiable)
 
   if (protocols.isEmpty) {
     throw new IllegalArgumentException("No protocol configured for identification.")
@@ -190,13 +149,20 @@ class AnyProtocolHandler extends Protoco
     connection.transport.resumeRead
     import OptionSupport._
     import collection.JavaConversions._
-    
+
+    var codec = connection.transport.getProtocolCodec().asInstanceOf[AnyProtocolCodec]
+
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.flatMap{ _ match {
       case x:DetectDTO => Some(x)
       case _ => None
     }}.headOption.getOrElse(new DetectDTO)
-    
+
+    if( config.protocols!=null ) {
+      val protocols = Set(config.protocols.split("\\s+").filter( _.length!=0 ):_*)
+      codec.protocols = codec.protocols.filter(x=> protocols.contains(x.id))
+    }
+
     val timeout = config.timeout.getOrElse(5000L)
   
     // Make sure client connects eventually...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Tue Aug 14 18:47:38 2012
@@ -22,12 +22,17 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.{Log, ClassFinder}
 import org.apache.activemq.apollo.broker.{Broker, Message, BrokerConnection}
 import org.apache.activemq.apollo.dto.{SimpleProtocolFilterDTO, ProtocolFilterDTO, ConnectionStatusDTO}
+import org.fusesource.hawtbuf.Buffer
+import scala.collection.mutable.ListBuffer
 
-trait Protocol {
-  def id:String
+trait Protocol extends ProtocolCodecFactory.Provider {
   def createProtocolHandler:ProtocolHandler
-  def encode(message:Message):MessageRecord
-  def decode(message:MessageRecord):Message
+}
+
+abstract class BaseProtocol extends Protocol {
+  def isIdentifiable = false
+  def maxIdentificaionLength = throw new UnsupportedOperationException()
+  def matchesIdentification(buffer: Buffer) = throw new UnsupportedOperationException()
 }
 
 /**
@@ -36,15 +41,37 @@ trait Protocol {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object Protocol {
+object ProtocolFactory {
 
-  val finder = new ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocol-factory.index",classOf[Protocol])
+  private def finder = new ClassFinder[Protocol]("META-INF/services/org.apache.activemq.apollo/protocol-factory.index", classOf[Protocol])
+  val protocols:Array[Protocol] = finder.singletons.toArray
+  val protocols_by_id = Map(protocols.map(x=> (x.id, x)): _*)
+  def get(name:String):Option[Protocol] = protocols_by_id.get(name)
 
-  val protocols = Map((for( provider <- finder.singletons ) {
-    yield (provider.id, provider)
-  }): _*)
+}
 
-  def get(name:String):Option[Protocol] = protocols.get(name)
+trait MessageCodec {
+  def id():String
+  def encode(message:Message):MessageRecord
+  def decode(message:MessageRecord):Message
+}
+
+object MessageCodecFactory {
+
+  trait Provider {
+    def create:Array[MessageCodec]
+  }
+
+  val codecs:Array[MessageCodec] = {
+    val finder = new ClassFinder[Provider]("META-INF/services/org.apache.activemq.apollo/message-codec-factory.index", classOf[Provider])
+    val rc = ListBuffer[MessageCodec]()
+    for( provider <- finder.singletons; codec <- provider.create ) {
+      rc += codec
+    }
+    rc.toArray
+  }
+  val codecs_by_id = Map(codecs.map(x=> (x.id, x)): _*)
+  def apply(id:String) = codecs_by_id.get(id)
 }
 
 object ProtocolHandler extends Log

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawMessageCodec.scala Tue Aug 14 18:47:38 2012
@@ -24,6 +24,10 @@ import org.apache.activemq.apollo.broker
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 import org.fusesource.hawtdispatch.transport.ProtocolCodec
 
+object RawMessageCodecFactory extends MessageCodecFactory.Provider {
+  def create = Array[MessageCodec](RawMessageCodec)
+}
+
 /**
  * <p>
  * </p>
@@ -35,7 +39,7 @@ object RawMessageCodec extends MessageCo
   val PROTOCOL_ID = new AsciiBuffer(id)
   def id = "raw"
 
-  override def encode(message: Message):MessageRecord = {
+  def encode(message: Message):MessageRecord = {
     message match {
       case message:RawMessage =>
         val rc = new MessageRecord
@@ -46,13 +50,10 @@ object RawMessageCodec extends MessageCo
     }
   }
 
-  override def decode(message: MessageRecord) = {
+  def decode(message: MessageRecord) = {
     assert( message.codec == PROTOCOL_ID )
     RawMessage(message.buffer)
   }
-
-  def createProtocolHandler = throw new UnsupportedOperationException
-  def createProtocolCodec = throw new UnsupportedOperationException
 }
 
 case class RawMessage(payload:Buffer) extends Message {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Tue Aug 14 18:47:38 2012
@@ -79,86 +79,156 @@ class UdpProtocolCodec extends ProtocolC
 
 object UdpProtocolHandler extends Log
 
-class UdpMessage {
+trait DecodedUdpMessage {
 
+  /**
+   * @return The virtual host the message should get routed to.
+   * return null to route to the default virtual host
+   */
+  def host:VirtualHost
+
+  /**
+   * @return The destination name to route the message to.
+   */
+  def address:AsciiBuffer
+
+  /**
+   * @return The delivery to route.
+   */
+  def delivery:Delivery
+
+  /**
+   * @return The original UdpMesasge that the DecodedUdpMessage was
+   * constructed from.
+   */
+  def message: UdpMessage
+
+  /**
+   * @return The SecurityContext to authenticate and authorize against. Return
+   *         null if you want to bypass authentication and authorization.
+   */
+  def security_context:SecurityContext
 }
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class UdpProtocolHandler extends ProtocolHandler {
+abstract class UdpProtocolHandler extends ProtocolHandler {
   import UdpProtocolHandler._
 
   def protocol = "udp"
   def session_id = None
 
-  var buffer_size = 0
-  var host:VirtualHost = _
+  var buffer_size = 64*1024
   var connection_log:Log = _
   var config:UdpDTO = _
+  var messages_received = 0L
+  var waiting_on = "client request"
 
   def broker = connection.connector.broker
   def queue = connection.dispatch_queue
 
+  override def create_connection_status = {
+    var rc = super.create_connection_status
+    rc.waiting_on = waiting_on
+    rc.messages_received = messages_received
+    rc
+  }
+
+  def configure(config:UdpDTO) = {
+    this.config = config
+    buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+  }
+
   override def on_transport_connected = {
     connection.transport.resumeRead
     import collection.JavaConversions._
 
-    config = (connection.connector.config match {
+    configure((connection.connector.config match {
       case connector_config:AcceptingConnectorDTO =>
         connector_config.protocols.flatMap{ _ match {
           case x:UdpDTO => Some(x)
           case _ => None
         }}.headOption
       case _ => None
-    }).getOrElse(new UdpDTO)
+    }).getOrElse(new UdpDTO) )
+  }
 
-    buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
-    decoder.init(this)
+  def suspend_read(reason: String) = {
+    waiting_on = reason
+    connection.transport.suspendRead
+  }
 
-    broker.dispatch_queue {
-      var host = broker.get_default_virtual_host
-      queue {
-        this.host = host
-        connection_log = this.host.connection_log
-        connection.transport.resumeRead()
-        if(host==null) {
-          warn("Could not find default virtual host")
-          connection.stop(NOOP)
-        }
-      }
-    }
-    
+  def resume_read() = {
+    waiting_on = "client request"
+    connection.transport.resumeRead
   }
 
-  var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](1000) {
-    override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
-      host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+
+  var producerRoutes = new LRUCache[(VirtualHost, AsciiBuffer, SecurityContext#Key), UdpProducerRoute](1000) {
+    override def onCacheEviction(eldest: Entry[(VirtualHost, AsciiBuffer, SecurityContext#Key), UdpProducerRoute]) = {
+      val (host, address, key) = eldest.getKey
+      val route = eldest.getValue
+      host.dispatch_queue {
+        host.router.disconnect(route.addresses, route)
+      }
     }
   }
 
   override def on_transport_command(command: AnyRef) = {
-    val msg = command.asInstanceOf[UdpMessage]
-    val address = decoder.address(msg)
-    var route = producerRoutes.get(address);
-    if( route == null ) {
-      route = new StompProducerRoute(address)
-      producerRoutes.put(address, route)
-      val security_context = new SecurityContext
-      security_context.connector_id = connection.connector.id
-      security_context.local_address = connection.transport.getLocalAddress
-      host.dispatch_queue {
-        val rc = host.router.connect(route.addresses, route, security_context)
-        if( rc.isDefined ) {
-
+    decode(command.asInstanceOf[UdpMessage]) match {
+      case Some(msg) =>
+        messages_received += 1
+        val address = msg.address
+        var host = msg.host
+        if( host == null ) {
+          host = broker.default_virtual_host
         }
-      }
+        val security_context = msg.security_context
+        var sc_key = if( security_context!=null) security_context.to_key else null
+        var route = producerRoutes.get((host, address, sc_key));
+        if( route == null ) {
+          route = new UdpProducerRoute(host, address)
+          producerRoutes.put((host, address, sc_key), route)
+
+          def fail_connect = {
+            // Just drop messages..
+            route.sink_switch.downstream = Some(BlackHoleSink())
+          }
+
+          def continue_connect = host.dispatch_queue {
+            host.router.connect(route.addresses, route, security_context) match {
+              case Some(error) => queue {
+                fail_connect
+              }
+              case None =>
+            }
+          }
+
+          if( security_context!=null && host.authenticator!=null &&  host.authorizer!=null ) {
+            suspend_read("authenticating")
+            host.authenticator.authenticate(security_context) { auth_failure=>
+              queue {
+                resume_read
+                auth_failure match {
+                  case null=> continue_connect
+                  case auth_failure=> fail_connect
+                }
+              }
+            }
+          } else {
+            continue_connect
+          }
+        }
+
+        route.send(msg)
+
+      case None =>
     }
-    route.send(msg);
   }
 
-  class StompProducerRoute(dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
-    val addresses = decoder.decode_addresses(dest)
+  class UdpProducerRoute(host:VirtualHost, dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
+    val addresses = decode_address(dest.toString)
     val key = addresses.toList
     
     override def send_buffer_size = buffer_size
@@ -179,20 +249,20 @@ class UdpProtocolHandler extends Protoco
       sink_switch.downstream = Some(this)
     }
 
-    def send(frame:UdpMessage) = {
+    def send(frame:DecodedUdpMessage) = {
       // Drop older entries to make room for this new one..
       while( inbound_queue_size >= buffer_size ) {
         inbound_queue.removeFirst
       }
       
-      val delivery = decode_delivery(frame)
+      val delivery = frame.delivery
       inbound_queue_size += delivery.size
       inbound_queue.offer(delivery)
     }
   }
 
-
-  abstract def decode_delivery(message: UdpMessage):Delivery
+  def decode(message: UdpMessage):Option[DecodedUdpMessage]
+  def decode_address(address:String):Array[SimpleAddress]
 }
 
 /**
@@ -202,29 +272,40 @@ class UdpProtocolHandler extends Protoco
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class UdpProtocol extends Protocol {
+class UdpProtocol extends BaseProtocol {
 
   def id = "udp"
   def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
+
   def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
 
+    var default_host:VirtualHost = _
     var topic_address:AsciiBuffer = _
-    var topic_address_decoded:Array[SimpleAddress] = _
 
-    def init(handler:UdpProtocolHandler) = {
-      val topic_name = Option(handler.config.topic).getOrElse("udp")
-      topic_address_decoded = LocalRouter.destination_parser.decode_multi_destination(topic_name, (name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
+    override def configure(config: UdpDTO) {
+      super.configure(config)
+      val topic_address_decoded = decode_address(Option(config.topic).getOrElse("udp"))
       topic_address = new AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
+      default_host = broker.default_virtual_host
     }
 
-    def address(message: UdpMessage) = topic_address
-    def decode_addresses(value: AsciiBuffer) = topic_address_decoded
-    def decode_delivery(message: UdpMessage) = {
-      val delivery = new Delivery
-      delivery.size = message.buffer.remaining()
-      delivery.message = RawMessage(new Buffer(message.buffer))
-      delivery
+    def decode_address(address:String):Array[SimpleAddress] = {
+      LocalRouter.destination_parser.decode_multi_destination(address, (name)=> LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
     }
 
+    def decode(udp: UdpMessage) = Some(new DecodedUdpMessage {
+      def message = udp
+      def host = null
+      def security_context = null
+      def address = topic_address
+
+      def delivery = {
+        val delivery = new Delivery
+        delivery.size = message.buffer.remaining()
+        delivery.message = RawMessage(new Buffer(message.buffer))
+        delivery
+      }
+    })
+
   }
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/SecurityContext.scala Tue Aug 14 18:47:38 2012
@@ -23,6 +23,7 @@ import java.net.SocketAddress
 import org.apache.activemq.apollo.broker.Broker.BLOCKABLE_THREAD_POOL
 import org.fusesource.hawtdispatch._
 import javax.security.auth.login.LoginContext
+import scala.collection.mutable.ListBuffer
 
 /**
  * <p>
@@ -42,6 +43,17 @@ class SecurityContext {
   var login_context:LoginContext = _
   var session_id:Option[String] = None
 
+  case class Key(user:String,
+    password:String,
+    sso_token:String,
+    certificates:ListBuffer[X509Certificate],
+    connector_id:String,
+    local_address:SocketAddress,
+    remote_address:SocketAddress)
+
+  def to_key = Key(user, password, sso_token, if(certificates==null) ListBuffer() else ListBuffer(certificates : _*),
+    connector_id, local_address, remote_address)
+
   def credential_dump = {
     var rc = List[String]()
     if(certificates!=null) {
@@ -66,7 +78,6 @@ class SecurityContext {
   }
 
   private var _subject:Subject = _
-
   def subject = _subject
 
   private var _principals = Set[Principal]()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Tue Aug 14 18:47:38 2012
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.Atomi
 class MessageRecord {
 
   var key = -1L
-  var protocol: AsciiBuffer = _
+  var codec: AsciiBuffer = _
   var buffer: Buffer = _
   var compressed: Buffer = _
   var direct_buffer: DirectBuffer = _

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Tue Aug 14 18:47:38 2012
@@ -30,7 +30,7 @@ object PBSupport {
   implicit def to_pb(v: MessageRecord):MessagePB.Bean = {
     val pb = new MessagePB.Bean
     pb.setMessageKey(v.key)
-    pb.setProtocol(v.protocol)
+    pb.setCodec(v.codec)
     pb.setValue(v.buffer)
     pb
   }
@@ -38,7 +38,7 @@ object PBSupport {
   implicit def from_pb(pb: MessagePB.Getter):MessageRecord = {
     val rc = new MessageRecord
     rc.key = pb.getMessageKey
-    rc.protocol = pb.getProtocol
+    rc.codec = pb.getCodec
     rc.buffer = pb.getValue
     rc
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Tue Aug 14 18:47:38 2012
@@ -92,7 +92,7 @@ abstract class StoreFunSuiteSupport exte
 
   def add_message(batch:StoreUOW, content:String) = {
     var message = new MessageRecord
-    message.protocol = ascii("test-protocol")
+    message.codec = ascii("test-protocol")
     message.buffer = ascii(content).buffer
     message.locator = new AtomicReference[Object]()
     val key = batch.store(message)

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DetectDTO.java Tue Aug 14 18:47:38 2012
@@ -39,6 +39,9 @@ public class DetectDTO extends ProtocolD
     @XmlAttribute(name="timeout")
     public Long timeout;
 
+    @XmlAttribute(name="protocols")
+    public String protocols;
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java Tue Aug 14 18:47:38 2012
@@ -33,13 +33,6 @@ import javax.xml.bind.annotation.XmlRoot
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class UdpDTO extends ProtocolDTO {
 
-    /**
-     * Class name of the decoder that will be used to interpret the
-     * UDP message
-     */
-    @XmlAttribute(name="decoder")
-    public String decoder;
-
     @XmlAttribute(name="buffer_size")
     public String buffer_size;
 
@@ -48,13 +41,24 @@ public class UdpDTO extends ProtocolDTO 
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        UdpDTO detectDTO = (UdpDTO) o;
-
-        if (decoder != null ? !decoder.equals(detectDTO.decoder) : detectDTO.decoder != null) return false;
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof UdpDTO)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        UdpDTO udpDTO = (UdpDTO) o;
+
+        if (buffer_size != null ? !buffer_size.equals(udpDTO.buffer_size) : udpDTO.buffer_size != null) {
+            return false;
+        }
+        if (topic != null ? !topic.equals(udpDTO.topic) : udpDTO.topic != null) {
+            return false;
+        }
 
         return true;
     }
@@ -62,7 +66,8 @@ public class UdpDTO extends ProtocolDTO 
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + (decoder != null ? decoder.hashCode() : 0);
+        result = 31 * result + (buffer_size != null ? buffer_size.hashCode() : 0);
+        result = 31 * result + (topic != null ? topic.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/LevelDBClient.scala Tue Aug 14 18:47:38 2012
@@ -842,7 +842,7 @@ class LevelDBClient(store: LevelDBStore)
                       if (message_record != null) {
 
                         val pb = new MessagePB.Bean
-                        pb.setProtocol(message_record.protocol)
+                        pb.setCodec(message_record.codec)
 
                         val body = if(message_record.compressed!=null) {
                           pb.setCompression(1)
@@ -1344,7 +1344,7 @@ class LevelDBClient(store: LevelDBStore)
 
               case record: MessagePB.Buffer =>
                 val pb = new MessagePB.Bean
-                pb.setProtocol(record.getProtocol)
+                pb.setCodec(record.getCodec)
                 val body = if(snappy_compress_logs) {
                   val compressed = Snappy.compress(record.getValue)
                   if (compressed.length < record.getValue.length) {

Copied: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.RawMessageCodec
\ No newline at end of file
+org.apache.activemq.apollo.openwire.OpenwireMessageCodecFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.openwire.OpenwireProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.openwire.OpenwireProtocol
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala Tue Aug 14 18:47:38 2012
@@ -41,7 +41,7 @@ object OpenwireCodec extends Sizer[Comma
 
   def encode(message: Message):MessageRecord = {
     val rc = new MessageRecord
-    rc.protocol = PROTOCOL
+    rc.codec = PROTOCOL
 
     val msg = message.asInstanceOf[OpenwireMessage];
     rc.buffer = msg.message.getCachedEncoding match {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Tue Aug 14 18:47:38 2012
@@ -39,7 +39,7 @@ class OpenwireMessage(val message:Active
 
   def getLocalConnectionId = message.getProducerId.getConnectionId
 
-  def protocol = OpenwireProtocol
+  def codec = OpenwireMessageCodec
 
   def getBodyAs[T](toType : Class[T]) = {
     (message match {
@@ -78,7 +78,7 @@ object EndOfBrowseMessage extends Messag
   def retained(): Int = 0
   def retain() {}
   def release() {}
-  def protocol: Protocol = null
+  def codec = null
   def priority: Byte = 0
   def persistent: Boolean = false
   def expiration: Long = 0L

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala Tue Aug 14 18:47:38 2012
@@ -20,7 +20,7 @@ package org.apache.activemq.apollo.openw
 import org.apache.activemq.apollo.broker.store.MessageRecord
 import org.apache.activemq.apollo.broker.Message
 import OpenwireConstants._
-import org.apache.activemq.apollo.broker.protocol.{ProtocolCodecFactory, Protocol, ProtocolFactory}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
 import org.fusesource.hawtbuf.Buffer
 import org.apache.activemq.apollo.util.Log
 
@@ -30,40 +30,31 @@ import org.apache.activemq.apollo.util.L
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object OpenwireProtocolFactory extends ProtocolFactory {
+object OpenwireProtocol extends OpenwireProtocolCodecFactory with Protocol with Log {
 
-  def create() = OpenwireProtocol
+  def createProtocolHandler = new OpenwireProtocolHandler
 
-  def create(config: String) = if(config == PROTOCOL) {
-    OpenwireProtocol
-  } else {
+  lazy val log_exerimental_warning = {
+    warn("The OpenWire protocol implementation is still experimental and not recommended for production use.  Production users should use ActiveMQ instead.")
     null
   }
+}
 
+object OpenwireMessageCodecFactory extends MessageCodecFactory.Provider {
+  def create = Array[MessageCodec](OpenwireMessageCodec)
 }
 
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object OpenwireProtocol extends OpenwireProtocolCodecFactory with Protocol with Log {
+object OpenwireMessageCodec extends MessageCodec {
 
-  def createProtocolHandler = new OpenwireProtocolHandler
+  def id = "openwire"
 
   def encode(message: Message):MessageRecord = {
     OpenwireCodec.encode(message)
   }
-
   def decode(message: MessageRecord) = {
     OpenwireCodec.decode(message)
   }
 
-  lazy val log_exerimental_warning = {
-    warn("The OpenWire protocol implementation is still experimental and not recommended for production use.  Production users should use ActiveMQ instead.")
-    null
-  }
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -935,7 +935,7 @@ class OpenwireProtocolHandler extends Pr
     override def receive_buffer_size = buffer_size
 
     def matches(delivery:Delivery) = {
-      if( delivery.message.protocol eq OpenwireProtocol ) {
+      if( delivery.message.codec eq OpenwireMessageCodec ) {
         if( selector_expression!=null ) {
           selector_expression.matches(delivery.message)
         } else {

Copied: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index (from r1373021, activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1373021&r2=1373022&rev=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/message-codec-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompMessageCodecFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Aug 14 18:47:38 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.stomp.StompProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.stomp.StompProtocol
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Tue Aug 14 18:47:38 2012
@@ -40,7 +40,7 @@ object StompCodec extends Log {
     val frame = message.frame
 
     val rc = new MessageRecord
-    rc.protocol = PROTOCOL
+    rc.codec = PROTOCOL
 
     if( frame.content.isInstanceOf[ZeroCopyContent] ) {
       rc.direct_buffer = frame.content.asInstanceOf[ZeroCopyContent].zero_copy_buffer

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Tue Aug 14 18:47:38 2012
@@ -37,7 +37,7 @@ import Stomp._
  */
 case class StompFrameMessage(frame:StompFrame) extends Message {
   
-  def protocol = StompProtocol
+  def codec = StompMessageCodec
 
   /**
    * the globally unique id of the message

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Tue Aug 14 18:47:38 2012
@@ -18,10 +18,8 @@ package org.apache.activemq.apollo.stomp
 
 import _root_.org.fusesource.hawtbuf._
 import org.apache.activemq.apollo.broker._
-import java.lang.String
-import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
 import Stomp._
-import org.fusesource.hawtdispatch.transport._
 import org.apache.activemq.apollo.broker.store._
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -51,24 +49,23 @@ class StompProtocolCodecFactory extends 
   }
 }
 
-class StompProtocolFactory extends ProtocolFactory {
-
-  def create() = StompProtocol
-
-  def create(config: String) = if(config == "stomp") {
-    StompProtocol
-  } else {
-    null
-  }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object StompProtocol extends StompProtocolCodecFactory with Protocol {
+  def createProtocolHandler = new StompProtocolHandler
+}
 
+object StompMessageCodecFactory extends MessageCodecFactory.Provider {
+  def create = Array[MessageCodec](StompMessageCodec)
 }
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object StompProtocol extends StompProtocolCodecFactory with Protocol {
+object StompMessageCodec extends MessageCodec{
 
-  def createProtocolHandler = new StompProtocolHandler
+  def id = "stomp"
 
   def encode(message: Message):MessageRecord = {
     StompCodec.encode(message.asInstanceOf[StompFrameMessage])
@@ -81,3 +78,5 @@ object StompProtocol extends StompProtoc
 }
 
 
+
+

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1373022&r1=1373021&r2=1373022&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Aug 14 18:47:38 2012
@@ -422,12 +422,12 @@ class StompProtocolHandler extends Proto
       val (_, delivery) = event
 
       val message = delivery.message
-      var frame = if( message.protocol eq StompProtocol ) {
+      var frame = if( message.codec eq StompMessageCodec ) {
         message.asInstanceOf[StompFrameMessage].frame
       } else {
         val (body, content_type) =  protocol_convert match{
-          case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
-          case _ => (message.encoded, "protocol/"+message.protocol.id())
+          case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+          case _ => (message.encoded, "protocol/"+message.codec.id())
         }
         message_id_counter += 1
         var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil