You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/25 01:33:16 UTC

svn commit: r1377147 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apoll...

Author: chirino
Date: Fri Aug 24 23:33:15 2012
New Revision: 1377147

URL: http://svn.apache.org/viewvc?rev=1377147&view=rev
Log:
Update need to pickup API changes in hawtdispatch-transport.  Add initial cut of a SSLProtocol which allows us to accept SSL/TLS connections on normal tcp port.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.java
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Fri Aug 24 23:33:15 2012
@@ -35,7 +35,7 @@ import org.fusesource.amqp.codec.AMQPPro
 class AmqpProtocolCodecFactory extends ProtocolCodecFactory.Provider {
   def id = PROTOCOL
 
-  def createProtocolCodec() = new AMQPProtocolCodec();
+  def createProtocolCodec(connector:Connector) = new AMQPProtocolCodec();
 
   def isIdentifiable() = true
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Fri Aug 24 23:33:15 2012
@@ -16,3 +16,4 @@
 ## ---------------------------------------------------------------------------
 org.apache.activemq.apollo.broker.protocol.AnyProtocol
 org.apache.activemq.apollo.broker.protocol.UdpProtocol
+org.apache.activemq.apollo.broker.protocol.SSLProtocol

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Aug 24 23:33:15 2012
@@ -41,6 +41,7 @@ import org.apache.activemq.apollo.filter
 import org.xml.sax.InputSource
 import java.util
 import javax.management.openmbean.CompositeData
+import javax.net.ssl.SSLContext
 
 /**
  * <p>
@@ -698,4 +699,14 @@ class Broker() extends BaseService with 
 
   def first_accepting_connector = connectors.values.find(_.isInstanceOf[AcceptingConnector]).map(_.asInstanceOf[AcceptingConnector])
 
+  def ssl_context(protocol:String) = {
+    val rc = SSLContext.getInstance(protocol);
+    if( key_storage!=null ) {
+      rc.init(key_storage.create_key_managers, key_storage.create_trust_managers, null);
+    } else {
+      rc.init(null, null, null);
+    }
+    rc
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Fri Aug 24 23:33:15 2012
@@ -21,7 +21,7 @@ import _root_.java.lang.{String}
 import org.fusesource.hawtdispatch._
 import protocol.{ProtocolHandler}
 import org.apache.activemq.apollo.filter.BooleanExpression
-import org.fusesource.hawtdispatch.transport.{TransportListener, DefaultTransportListener, Transport}
+import org.fusesource.hawtdispatch.transport._
 import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
 import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
 
@@ -167,6 +167,20 @@ class BrokerConnection(var connector: Co
     }
     result
   }
+
+  def protocol_codec[T<:ProtocolCodec](clazz:Class[T]):T = {
+    var rc = transport.getProtocolCodec
+    while( rc !=null ) {
+      if( clazz.isInstance(rc) ) {
+        return clazz.cast(rc);
+      }
+      rc = rc match {
+        case rc:WrappingProtocolCodec => rc.getNext
+        case _ => null
+      }
+    }
+    return null.asInstanceOf[T]
+  }
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Aug 24 23:33:15 2012
@@ -164,7 +164,7 @@ class AcceptingConnector(val broker:Brok
 
     def onAccept(transport: Transport): Unit = {
       if( protocol!=null ) {
-        transport.setProtocolCodec(protocol.createProtocolCodec)
+        transport.setProtocolCodec(protocol.createProtocolCodec(AcceptingConnector.this))
       }
 
       accepted.incrementAndGet

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Fri Aug 24 23:33:15 2012
@@ -233,15 +233,13 @@ object WebSocketTransportFactory extends
     def setProtocolCodec(protocolCodec: ProtocolCodec) = {
       this.protocolCodec = protocolCodec
       if( this.protocolCodec!=null ) {
-        this.protocolCodec.setReadableByteChannel(this)
-        this.protocolCodec.setWritableByteChannel(this)
-        this.protocolCodec match {
-          case protocolCodec:TransportAware => protocolCodec.setTransport(this);
-          case _ =>
-        }
+        this.protocolCodec.setTransport(this)
       }
     }
 
+    def getReadChannel: ReadableByteChannel = this
+    def getWriteChannel: WritableByteChannel = this
+
     def dispatch_queue = dispatchQueue
 
     def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
@@ -343,6 +341,11 @@ object WebSocketTransportFactory extends
       }
     }
 
+    def drainInbound = {
+      inbound_dispatch_queue {
+        drain_inbound
+      }
+    }
 
     def close() {}
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Fri Aug 24 23:33:15 2012
@@ -17,17 +17,16 @@
 package org.apache.activemq.apollo.broker.protocol
 
 import org.fusesource.hawtbuf.Buffer
-import org.apache.activemq.apollo.broker.store.MessageRecord
-import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
+import java.nio.channels.ReadableByteChannel
 import java.nio.ByteBuffer
 import java.io.IOException
 import java.lang.String
 import java.util.concurrent.TimeUnit
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.OptionSupport
-import org.apache.activemq.apollo.broker.{Message, ProtocolException}
+import org.apache.activemq.apollo.broker.{Connector, ProtocolException}
 import org.apache.activemq.apollo.dto.{DetectDTO, AcceptingConnectorDTO}
-import transport.{Transport, TransportAware, ProtocolCodec}
+import org.fusesource.hawtdispatch.transport.{WrappingProtocolCodec, Transport, ProtocolCodec}
 
 /**
  * <p>
@@ -35,18 +34,37 @@ import transport.{Transport, TransportAw
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class AnyProtocol() extends BaseProtocol {
+object AnyProtocol extends BaseProtocol {
 
   def id = "any"
 
-  def createProtocolCodec = new AnyProtocolCodec()
+  def createProtocolCodec(connector:Connector) = new AnyProtocolCodec(connector)
 
   def createProtocolHandler = new AnyProtocolHandler
+
+  def change_protocol_codec(transport:Transport, codec:ProtocolCodec) = {
+    var current = transport.getProtocolCodec
+    var wrapper:WrappingProtocolCodec = null
+    while( current!=null ) {
+      current = current match {
+        case current:WrappingProtocolCodec =>
+          wrapper = current
+          current.getNext
+        case _ => null
+      }
+    }
+    if( wrapper!=null ) {
+      wrapper.setNext(codec)
+    } else {
+      transport.setProtocolCodec(codec)
+    }
+  }
+
 }
 
 case class ProtocolDetected(id:String, codec:ProtocolCodec)
 
-class AnyProtocolCodec() extends ProtocolCodec with TransportAware {
+class AnyProtocolCodec(val connector:Connector) extends ProtocolCodec {
 
   var protocols =  ProtocolFactory.protocols.filter(_.isIdentifiable)
 
@@ -54,15 +72,17 @@ class AnyProtocolCodec() extends Protoco
     throw new IllegalArgumentException("No protocol configured for identification.")
   }
   val buffer = ByteBuffer.allocate(protocols.foldLeft(0) {(a, b) => a.max(b.maxIdentificaionLength)})
-  var channel: ReadableByteChannel = null
-
-  def setReadableByteChannel(channel: ReadableByteChannel) = {this.channel = channel}
+  def channel = transport.getReadChannel
 
   var transport:Transport = _
-  def setTransport(t: Transport) = transport = t
+  def setTransport(t: Transport) =  transport = t
+
+  var next:ProtocolCodec = _
+
+
 
   def read: AnyRef = {
-    if (channel == null) {
+    if (next != null) {
       throw new IllegalStateException
     }
 
@@ -70,14 +90,13 @@ class AnyProtocolCodec() extends Protoco
     val buff = new Buffer(buffer.array(), 0, buffer.position())
     protocols.foreach {protocol =>
       if (protocol.matchesIdentification(buff)) {
-        val protocolCodec = protocol.createProtocolCodec()
-        transport.setProtocolCodec(protocolCodec)
-        protocolCodec.unread(buff.toByteArray)
-        return ProtocolDetected(protocol.id, protocolCodec)
+        next = protocol.createProtocolCodec(connector)
+        AnyProtocol.change_protocol_codec(transport, next)
+        next.unread(buff.toByteArray)
+        return ProtocolDetected(protocol.id, next)
       }
     }
     if (buffer.position() == buffer.capacity) {
-      channel = null
       throw new IOException("Could not identify the protocol.")
     }
     return null
@@ -87,8 +106,6 @@ class AnyProtocolCodec() extends Protoco
 
   def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
 
-  def setWritableByteChannel(channel: WritableByteChannel) = {}
-
   def write(value: Any) = ProtocolCodec.BufferState.FULL
 
   def full: Boolean = true
@@ -150,7 +167,7 @@ class AnyProtocolHandler extends Protoco
     import OptionSupport._
     import collection.JavaConversions._
 
-    var codec = connection.transport.getProtocolCodec().asInstanceOf[AnyProtocolCodec]
+    var codec = connection.protocol_codec(classOf[AnyProtocolCodec])
 
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.flatMap{ _ match {

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala?rev=1377147&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala Fri Aug 24 23:33:15 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.Connector
+import org.apache.activemq.apollo.util.ClassFinder
+import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+
+object ProtocolCodecFactory {
+  abstract trait Provider {
+    def id: String
+
+    /**
+     * @return an instance of the wire format.
+     *
+     */
+    def createProtocolCodec(connector: Connector): ProtocolCodec
+
+    /**
+     * @return true if this wire format factory is identifiable. An identifiable
+     *         protocol will first write a easy to identify header to the stream
+     */
+    def isIdentifiable: Boolean
+
+    /**
+     * @return Returns the maximum length of the header used to discriminate the wire format if it
+     *         { @link #isIdentifiable()}
+     * @throws UnsupportedOperationException If { @link #isIdentifiable()} is false
+     */
+    def maxIdentificaionLength: Int
+
+    /**
+     * Called to test if this protocol matches the identification header.
+     *
+     * @param buffer The byte buffer representing the header data read so far.
+     * @return true if the Buffer matches the protocol format header.
+     */
+    def matchesIdentification(buffer: Buffer): Boolean
+  }
+
+  final val providers: ClassFinder[ProtocolCodecFactory.Provider] = new ClassFinder[ProtocolCodecFactory.Provider]("META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index", classOf[ProtocolCodecFactory.Provider])
+
+  /**
+   * Gets the provider.
+   */
+  def get(name: String): ProtocolCodecFactory.Provider = {
+    import scala.collection.JavaConversions._
+    for (provider <- providers.jsingletons) {
+      if (name == provider.id) {
+        return provider
+      }
+    }
+    return null
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala?rev=1377147&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala Fri Aug 24 23:33:15 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+import org.fusesource.hawtdispatch.transport.SSLProtocolCodec
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.Connector
+
+/**
+ */
+class SSLProtocol extends Protocol {
+  def id(): String = "ssl"
+
+  override def isIdentifiable = true
+  override def maxIdentificaionLength = 5
+  override def matchesIdentification(buffer: Buffer):Boolean = {
+    if( buffer.length >= 5 ) {
+
+      // We have variable header offset..
+      ((buffer.get(0) & 0xC0) == 0x80) && // The rest of byte 0 and 1 are holds the record length.
+      (buffer.get(2) == 1) && // Client Hello
+      (
+        (
+          (buffer.get(3) == 2) // SSLv2
+        ) || (
+          (buffer.get(3) == 3) && // SSLv3 or TLS
+          (buffer.get(4) match {  // Minor version
+            case 0 => true // SSLv3
+            case 1 => true // TLSv1
+            case 2 => true // TLSv2
+            case 3 => true // TLSv3
+            case _ => false
+          })
+        )
+      )
+    } else {
+      false
+    }
+  }
+
+  def createProtocolCodec(connector:Connector) = {
+    val rc = new SSLProtocolCodec()
+    rc.setSSLContext(connector.broker.ssl_context("SSL"))
+    rc.server(SSLProtocolCodec.ClientAuth.NONE)
+    rc.setNext(new AnyProtocolCodec(connector))
+    rc
+  }
+
+  def createProtocolHandler = new AnyProtocolHandler
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Fri Aug 24 23:33:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.broker.protocol
 
-import org.fusesource.hawtdispatch.transport.ProtocolCodec
+import org.fusesource.hawtdispatch.transport.{Transport, ProtocolCodec}
 import java.nio.ByteBuffer
 import org.fusesource.hawtdispatch._
 import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
@@ -38,8 +38,10 @@ class UdpProtocolCodec extends ProtocolC
   def protocol = "udp"
 
   var channel: DatagramChannel = null
-  def setReadableByteChannel(channel: ReadableByteChannel) = {
-    this.channel = channel.asInstanceOf[DatagramChannel]
+
+
+  def setTransport(transport: Transport) {
+    this.channel = transport.getReadChannel.asInstanceOf[DatagramChannel]
   }
 
   var read_counter = 0L
@@ -68,7 +70,6 @@ class UdpProtocolCodec extends ProtocolC
   def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
 
   // This protocol only supports receiving..
-  def setWritableByteChannel(channel: WritableByteChannel) = {}
   def write(value: AnyRef) = ProtocolCodec.BufferState.FULL
   def full: Boolean = true
   def flush = ProtocolCodec.BufferState.FULL
@@ -285,7 +286,7 @@ abstract class UdpProtocolHandler extend
 class UdpProtocol extends BaseProtocol {
 
   def id = "udp"
-  def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
+  def createProtocolCodec(connector:Connector):ProtocolCodec = new UdpProtocolCodec()
 
   def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
     type ConfigTypeDTO = UdpDTO

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala Fri Aug 24 23:33:15 2012
@@ -18,7 +18,7 @@
 package org.apache.activemq.apollo.openwire
 
 import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.apache.activemq.apollo.broker.Message
+import org.apache.activemq.apollo.broker.{Connector, Message}
 import OpenwireConstants._
 import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
 import org.fusesource.hawtbuf.Buffer
@@ -68,7 +68,7 @@ class OpenwireProtocolCodecFactory exten
 
   def id = PROTOCOL
 
-  def createProtocolCodec() = {
+  def createProtocolCodec(connector:Connector) = {
     OpenwireProtocol.log_exerimental_warning
     new OpenwireCodec();
   }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Aug 24 23:33:15 2012
@@ -145,7 +145,7 @@ class OpenwireProtocolHandler extends Pr
     super.set_connection(connection)
     import collection.JavaConversions._
 
-    codec = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec]
+    codec = connection.protocol_codec(classOf[OpenwireCodec])
     var connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find( _.isInstanceOf[OpenwireDTO]).map(_.asInstanceOf[OpenwireDTO]).getOrElse(new OpenwireDTO)
 
@@ -384,7 +384,7 @@ class OpenwireProtocolHandler extends Pr
       die("Remote wire format (%s) is lower the minimum version required (%s)".format(info.getVersion(), minimum_protocol_version))
     }
 
-    wire_format = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec].format
+    wire_format = connection.protocol_codec(classOf[OpenwireCodec]).format
     wire_format.renegotiateWireFormat(info, preferred_wireformat_settings)
 
     val inactive_time = preferred_wireformat_settings.getMaxInactivityDuration().min(info.getMaxInactivityDuration())

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Aug 24 23:33:15 2012
@@ -34,7 +34,7 @@ class StompProtocolCodecFactory extends 
 
   def id = PROTOCOL
 
-  def createProtocolCodec() = new StompCodec();
+  def createProtocolCodec(connector:Connector) = new StompCodec();
 
   def isIdentifiable() = true
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Aug 24 23:33:15 2012
@@ -663,7 +663,7 @@ class StompProtocolHandler extends Proto
     super.set_connection(connection)
     import collection.JavaConversions._
 
-    codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+    codec = connection.protocol_codec(classOf[StompCodec])
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
 
@@ -949,7 +949,7 @@ class StompProtocolHandler extends Proto
 
     if( protocol_version != V1_0 ) {
       // disable trimming...
-      connection.transport.getProtocolCodec.asInstanceOf[StompCodec].trim = false
+      codec.trim = false
     }
 
     val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEART_BEAT)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml Fri Aug 24 23:33:15 2012
@@ -24,5 +24,6 @@
 
     <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
     <connector id="ssl" bind="ssl://0.0.0.0:0" />
+    <connector id="tcp" bind="tcp://0.0.0.0:0" />
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala Fri Aug 24 23:33:15 2012
@@ -36,4 +36,8 @@ class StompSslTest extends StompTestSupp
   test("Connect over SSL") {
     connect("1.1")
   }
+
+  test("Detect SSL on TCP port") {
+    connect("1.1", connector="tcp")
+  }
 }