You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/25 01:33:16 UTC
svn commit: r1377147 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apoll...
Author: chirino
Date: Fri Aug 24 23:33:15 2012
New Revision: 1377147
URL: http://svn.apache.org/viewvc?rev=1377147&view=rev
Log:
Update need to pickup API changes in hawtdispatch-transport. Add initial cut of a SSLProtocol which allows us to accept SSL/TLS connections on normal tcp port.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.java
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Fri Aug 24 23:33:15 2012
@@ -35,7 +35,7 @@ import org.fusesource.amqp.codec.AMQPPro
class AmqpProtocolCodecFactory extends ProtocolCodecFactory.Provider {
def id = PROTOCOL
- def createProtocolCodec() = new AMQPProtocolCodec();
+ def createProtocolCodec(connector:Connector) = new AMQPProtocolCodec();
def isIdentifiable() = true
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Fri Aug 24 23:33:15 2012
@@ -16,3 +16,4 @@
## ---------------------------------------------------------------------------
org.apache.activemq.apollo.broker.protocol.AnyProtocol
org.apache.activemq.apollo.broker.protocol.UdpProtocol
+org.apache.activemq.apollo.broker.protocol.SSLProtocol
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Aug 24 23:33:15 2012
@@ -41,6 +41,7 @@ import org.apache.activemq.apollo.filter
import org.xml.sax.InputSource
import java.util
import javax.management.openmbean.CompositeData
+import javax.net.ssl.SSLContext
/**
* <p>
@@ -698,4 +699,14 @@ class Broker() extends BaseService with
def first_accepting_connector = connectors.values.find(_.isInstanceOf[AcceptingConnector]).map(_.asInstanceOf[AcceptingConnector])
+ def ssl_context(protocol:String) = {
+ val rc = SSLContext.getInstance(protocol);
+ if( key_storage!=null ) {
+ rc.init(key_storage.create_key_managers, key_storage.create_trust_managers, null);
+ } else {
+ rc.init(null, null, null);
+ }
+ rc
+ }
+
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Fri Aug 24 23:33:15 2012
@@ -21,7 +21,7 @@ import _root_.java.lang.{String}
import org.fusesource.hawtdispatch._
import protocol.{ProtocolHandler}
import org.apache.activemq.apollo.filter.BooleanExpression
-import org.fusesource.hawtdispatch.transport.{TransportListener, DefaultTransportListener, Transport}
+import org.fusesource.hawtdispatch.transport._
import org.apache.activemq.apollo.dto.{DestinationDTO, ConnectionStatusDTO}
import org.apache.activemq.apollo.util.{Dispatched, Log, BaseService}
@@ -167,6 +167,20 @@ class BrokerConnection(var connector: Co
}
result
}
+
+ def protocol_codec[T<:ProtocolCodec](clazz:Class[T]):T = {
+ var rc = transport.getProtocolCodec
+ while( rc !=null ) {
+ if( clazz.isInstance(rc) ) {
+ return clazz.cast(rc);
+ }
+ rc = rc match {
+ case rc:WrappingProtocolCodec => rc.getNext
+ case _ => null
+ }
+ }
+ return null.asInstanceOf[T]
+ }
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Aug 24 23:33:15 2012
@@ -164,7 +164,7 @@ class AcceptingConnector(val broker:Brok
def onAccept(transport: Transport): Unit = {
if( protocol!=null ) {
- transport.setProtocolCodec(protocol.createProtocolCodec)
+ transport.setProtocolCodec(protocol.createProtocolCodec(AcceptingConnector.this))
}
accepted.incrementAndGet
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Fri Aug 24 23:33:15 2012
@@ -233,15 +233,13 @@ object WebSocketTransportFactory extends
def setProtocolCodec(protocolCodec: ProtocolCodec) = {
this.protocolCodec = protocolCodec
if( this.protocolCodec!=null ) {
- this.protocolCodec.setReadableByteChannel(this)
- this.protocolCodec.setWritableByteChannel(this)
- this.protocolCodec match {
- case protocolCodec:TransportAware => protocolCodec.setTransport(this);
- case _ =>
- }
+ this.protocolCodec.setTransport(this)
}
}
+ def getReadChannel: ReadableByteChannel = this
+ def getWriteChannel: WritableByteChannel = this
+
def dispatch_queue = dispatchQueue
def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
@@ -343,6 +341,11 @@ object WebSocketTransportFactory extends
}
}
+ def drainInbound = {
+ inbound_dispatch_queue {
+ drain_inbound
+ }
+ }
def close() {}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Fri Aug 24 23:33:15 2012
@@ -17,17 +17,16 @@
package org.apache.activemq.apollo.broker.protocol
import org.fusesource.hawtbuf.Buffer
-import org.apache.activemq.apollo.broker.store.MessageRecord
-import java.nio.channels.{WritableByteChannel, ReadableByteChannel}
+import java.nio.channels.ReadableByteChannel
import java.nio.ByteBuffer
import java.io.IOException
import java.lang.String
import java.util.concurrent.TimeUnit
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.util.OptionSupport
-import org.apache.activemq.apollo.broker.{Message, ProtocolException}
+import org.apache.activemq.apollo.broker.{Connector, ProtocolException}
import org.apache.activemq.apollo.dto.{DetectDTO, AcceptingConnectorDTO}
-import transport.{Transport, TransportAware, ProtocolCodec}
+import org.fusesource.hawtdispatch.transport.{WrappingProtocolCodec, Transport, ProtocolCodec}
/**
* <p>
@@ -35,18 +34,37 @@ import transport.{Transport, TransportAw
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class AnyProtocol() extends BaseProtocol {
+object AnyProtocol extends BaseProtocol {
def id = "any"
- def createProtocolCodec = new AnyProtocolCodec()
+ def createProtocolCodec(connector:Connector) = new AnyProtocolCodec(connector)
def createProtocolHandler = new AnyProtocolHandler
+
+ def change_protocol_codec(transport:Transport, codec:ProtocolCodec) = {
+ var current = transport.getProtocolCodec
+ var wrapper:WrappingProtocolCodec = null
+ while( current!=null ) {
+ current = current match {
+ case current:WrappingProtocolCodec =>
+ wrapper = current
+ current.getNext
+ case _ => null
+ }
+ }
+ if( wrapper!=null ) {
+ wrapper.setNext(codec)
+ } else {
+ transport.setProtocolCodec(codec)
+ }
+ }
+
}
case class ProtocolDetected(id:String, codec:ProtocolCodec)
-class AnyProtocolCodec() extends ProtocolCodec with TransportAware {
+class AnyProtocolCodec(val connector:Connector) extends ProtocolCodec {
var protocols = ProtocolFactory.protocols.filter(_.isIdentifiable)
@@ -54,15 +72,17 @@ class AnyProtocolCodec() extends Protoco
throw new IllegalArgumentException("No protocol configured for identification.")
}
val buffer = ByteBuffer.allocate(protocols.foldLeft(0) {(a, b) => a.max(b.maxIdentificaionLength)})
- var channel: ReadableByteChannel = null
-
- def setReadableByteChannel(channel: ReadableByteChannel) = {this.channel = channel}
+ def channel = transport.getReadChannel
var transport:Transport = _
- def setTransport(t: Transport) = transport = t
+ def setTransport(t: Transport) = transport = t
+
+ var next:ProtocolCodec = _
+
+
def read: AnyRef = {
- if (channel == null) {
+ if (next != null) {
throw new IllegalStateException
}
@@ -70,14 +90,13 @@ class AnyProtocolCodec() extends Protoco
val buff = new Buffer(buffer.array(), 0, buffer.position())
protocols.foreach {protocol =>
if (protocol.matchesIdentification(buff)) {
- val protocolCodec = protocol.createProtocolCodec()
- transport.setProtocolCodec(protocolCodec)
- protocolCodec.unread(buff.toByteArray)
- return ProtocolDetected(protocol.id, protocolCodec)
+ next = protocol.createProtocolCodec(connector)
+ AnyProtocol.change_protocol_codec(transport, next)
+ next.unread(buff.toByteArray)
+ return ProtocolDetected(protocol.id, next)
}
}
if (buffer.position() == buffer.capacity) {
- channel = null
throw new IOException("Could not identify the protocol.")
}
return null
@@ -87,8 +106,6 @@ class AnyProtocolCodec() extends Protoco
def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
- def setWritableByteChannel(channel: WritableByteChannel) = {}
-
def write(value: Any) = ProtocolCodec.BufferState.FULL
def full: Boolean = true
@@ -150,7 +167,7 @@ class AnyProtocolHandler extends Protoco
import OptionSupport._
import collection.JavaConversions._
- var codec = connection.transport.getProtocolCodec().asInstanceOf[AnyProtocolCodec]
+ var codec = connection.protocol_codec(classOf[AnyProtocolCodec])
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.flatMap{ _ match {
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala?rev=1377147&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolCodecFactory.scala Fri Aug 24 23:33:15 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.Connector
+import org.apache.activemq.apollo.util.ClassFinder
+import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+
+object ProtocolCodecFactory {
+ abstract trait Provider {
+ def id: String
+
+ /**
+ * @return an instance of the wire format.
+ *
+ */
+ def createProtocolCodec(connector: Connector): ProtocolCodec
+
+ /**
+ * @return true if this wire format factory is identifiable. An identifiable
+ * protocol will first write a easy to identify header to the stream
+ */
+ def isIdentifiable: Boolean
+
+ /**
+ * @return Returns the maximum length of the header used to discriminate the wire format if it
+ * { @link #isIdentifiable()}
+ * @throws UnsupportedOperationException If { @link #isIdentifiable()} is false
+ */
+ def maxIdentificaionLength: Int
+
+ /**
+ * Called to test if this protocol matches the identification header.
+ *
+ * @param buffer The byte buffer representing the header data read so far.
+ * @return true if the Buffer matches the protocol format header.
+ */
+ def matchesIdentification(buffer: Buffer): Boolean
+ }
+
+ final val providers: ClassFinder[ProtocolCodecFactory.Provider] = new ClassFinder[ProtocolCodecFactory.Provider]("META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index", classOf[ProtocolCodecFactory.Provider])
+
+ /**
+ * Gets the provider.
+ */
+ def get(name: String): ProtocolCodecFactory.Provider = {
+ import scala.collection.JavaConversions._
+ for (provider <- providers.jsingletons) {
+ if (name == provider.id) {
+ return provider
+ }
+ }
+ return null
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala?rev=1377147&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/SSLProtocol.scala Fri Aug 24 23:33:15 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+import org.fusesource.hawtdispatch.transport.SSLProtocolCodec
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.Connector
+
+/**
+ */
+class SSLProtocol extends Protocol {
+ def id(): String = "ssl"
+
+ override def isIdentifiable = true
+ override def maxIdentificaionLength = 5
+ override def matchesIdentification(buffer: Buffer):Boolean = {
+ if( buffer.length >= 5 ) {
+
+ // We have variable header offset..
+ ((buffer.get(0) & 0xC0) == 0x80) && // The rest of byte 0 and 1 are holds the record length.
+ (buffer.get(2) == 1) && // Client Hello
+ (
+ (
+ (buffer.get(3) == 2) // SSLv2
+ ) || (
+ (buffer.get(3) == 3) && // SSLv3 or TLS
+ (buffer.get(4) match { // Minor version
+ case 0 => true // SSLv3
+ case 1 => true // TLSv1
+ case 2 => true // TLSv2
+ case 3 => true // TLSv3
+ case _ => false
+ })
+ )
+ )
+ } else {
+ false
+ }
+ }
+
+ def createProtocolCodec(connector:Connector) = {
+ val rc = new SSLProtocolCodec()
+ rc.setSSLContext(connector.broker.ssl_context("SSL"))
+ rc.server(SSLProtocolCodec.ClientAuth.NONE)
+ rc.setNext(new AnyProtocolCodec(connector))
+ rc
+ }
+
+ def createProtocolHandler = new AnyProtocolHandler
+}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Fri Aug 24 23:33:15 2012
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.broker.protocol
-import org.fusesource.hawtdispatch.transport.ProtocolCodec
+import org.fusesource.hawtdispatch.transport.{Transport, ProtocolCodec}
import java.nio.ByteBuffer
import org.fusesource.hawtdispatch._
import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
@@ -38,8 +38,10 @@ class UdpProtocolCodec extends ProtocolC
def protocol = "udp"
var channel: DatagramChannel = null
- def setReadableByteChannel(channel: ReadableByteChannel) = {
- this.channel = channel.asInstanceOf[DatagramChannel]
+
+
+ def setTransport(transport: Transport) {
+ this.channel = transport.getReadChannel.asInstanceOf[DatagramChannel]
}
var read_counter = 0L
@@ -68,7 +70,6 @@ class UdpProtocolCodec extends ProtocolC
def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
// This protocol only supports receiving..
- def setWritableByteChannel(channel: WritableByteChannel) = {}
def write(value: AnyRef) = ProtocolCodec.BufferState.FULL
def full: Boolean = true
def flush = ProtocolCodec.BufferState.FULL
@@ -285,7 +286,7 @@ abstract class UdpProtocolHandler extend
class UdpProtocol extends BaseProtocol {
def id = "udp"
- def createProtocolCodec:ProtocolCodec = new UdpProtocolCodec()
+ def createProtocolCodec(connector:Connector):ProtocolCodec = new UdpProtocolCodec()
def createProtocolHandler:ProtocolHandler = new UdpProtocolHandler {
type ConfigTypeDTO = UdpDTO
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolFactory.scala Fri Aug 24 23:33:15 2012
@@ -18,7 +18,7 @@
package org.apache.activemq.apollo.openwire
import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.apache.activemq.apollo.broker.Message
+import org.apache.activemq.apollo.broker.{Connector, Message}
import OpenwireConstants._
import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
import org.fusesource.hawtbuf.Buffer
@@ -68,7 +68,7 @@ class OpenwireProtocolCodecFactory exten
def id = PROTOCOL
- def createProtocolCodec() = {
+ def createProtocolCodec(connector:Connector) = {
OpenwireProtocol.log_exerimental_warning
new OpenwireCodec();
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Aug 24 23:33:15 2012
@@ -145,7 +145,7 @@ class OpenwireProtocolHandler extends Pr
super.set_connection(connection)
import collection.JavaConversions._
- codec = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec]
+ codec = connection.protocol_codec(classOf[OpenwireCodec])
var connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find( _.isInstanceOf[OpenwireDTO]).map(_.asInstanceOf[OpenwireDTO]).getOrElse(new OpenwireDTO)
@@ -384,7 +384,7 @@ class OpenwireProtocolHandler extends Pr
die("Remote wire format (%s) is lower the minimum version required (%s)".format(info.getVersion(), minimum_protocol_version))
}
- wire_format = connection.transport.getProtocolCodec.asInstanceOf[OpenwireCodec].format
+ wire_format = connection.protocol_codec(classOf[OpenwireCodec]).format
wire_format.renegotiateWireFormat(info, preferred_wireformat_settings)
val inactive_time = preferred_wireformat_settings.getMaxInactivityDuration().min(info.getMaxInactivityDuration())
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Aug 24 23:33:15 2012
@@ -34,7 +34,7 @@ class StompProtocolCodecFactory extends
def id = PROTOCOL
- def createProtocolCodec() = new StompCodec();
+ def createProtocolCodec(connector:Connector) = new StompCodec();
def isIdentifiable() = true
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Aug 24 23:33:15 2012
@@ -663,7 +663,7 @@ class StompProtocolHandler extends Proto
super.set_connection(connection)
import collection.JavaConversions._
- codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ codec = connection.protocol_codec(classOf[StompCodec])
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new StompDTO)
@@ -949,7 +949,7 @@ class StompProtocolHandler extends Proto
if( protocol_version != V1_0 ) {
// disable trimming...
- connection.transport.getProtocolCodec.asInstanceOf[StompCodec].trim = false
+ codec.trim = false
}
val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEART_BEAT)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-ssl.xml Fri Aug 24 23:33:15 2012
@@ -24,5 +24,6 @@
<key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
<connector id="ssl" bind="ssl://0.0.0.0:0" />
+ <connector id="tcp" bind="tcp://0.0.0.0:0" />
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala?rev=1377147&r1=1377146&r2=1377147&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala Fri Aug 24 23:33:15 2012
@@ -36,4 +36,8 @@ class StompSslTest extends StompTestSupp
test("Connect over SSL") {
connect("1.1")
}
+
+ test("Detect SSL on TCP port") {
+ connect("1.1", connector="tcp")
+ }
}