You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 12:42:25 UTC
svn commit: r1239089 - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/
apollo-openwire/src/main/scala/org/apache/activemq/apoll...
Author: chirino
Date: Wed Feb 1 11:42:25 2012
New Revision: 1239089
URL: http://svn.apache.org/viewvc?rev=1239089&view=rev
Log:
Update to latest hawtdispatch, contains some leaky abstraction fixes also allows a wss:// transport to pass the client credentials to the security system for authentication/authorization.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Feb 1 11:42:25 2012
@@ -36,6 +36,7 @@ import org.fusesource.hawtdispatch.TaskT
import java.util.concurrent.TimeUnit
import security.SecuredResource.BrokerKind
import reflect.BeanProperty
+import java.net.InetSocketAddress
/**
* <p>
@@ -615,7 +616,10 @@ class Broker() extends BaseService with
//useful for testing
def get_connect_address = {
- Option(config.client_address).getOrElse(first_accepting_connector.get.transport_server.getConnectAddress)
+ Option(config.client_address).getOrElse {
+ val address= get_socket_address.asInstanceOf[InetSocketAddress]
+ "%s:%d".format(address.getHostName, address.getPort)
+ }
}
def get_socket_address = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Feb 1 11:42:25 2012
@@ -46,7 +46,7 @@ abstract class Connection() extends Base
transport.setDispatchQueue(dispatch_queue);
transport.setTransportListener(new TransportListener(){
def onTransportFailure(error: IOException) = Connection.this.on_transport_failure(error)
- def onTransportDisconnected(reconnecting:Boolean) = Connection.this.on_transport_disconnected
+ def onTransportDisconnected = Connection.this.on_transport_disconnected
def onTransportConnected = Connection.this.on_transport_connected
def onTransportCommand(command: AnyRef) = Connection.this.on_transport_command(command)
def onRefill = Connection.this.on_refill
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb 1 11:42:25 2012
@@ -39,6 +39,7 @@ import java.util.concurrent.ArrayBlockin
import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
import java.io.{EOFException, IOException}
+import java.security.cert.X509Certificate
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -153,8 +154,6 @@ object WebSocketTransportFactory extends
new URI(uri.getScheme + "://" + uri.getHost + ":" + connector.getLocalPort + prefix).toString
}
- def getConnectAddress = getBoundAddress
-
def getSocketAddress = new InetSocketAddress(uri.getHost, connector.getLocalPort)
val pending_connects = new ArrayBlockingQueue[WebSocketTransport](100)
@@ -184,7 +183,7 @@ object WebSocketTransportFactory extends
*
*/
case class WebSocketTransport(server: WsTransportServer, request: HttpServletRequest, protocol: String)
- extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with Transport with ScatteringByteChannel with GatheringByteChannel {
+ extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with SecureTransport with ScatteringByteChannel with GatheringByteChannel {
/////////////////////////////////////////////////////////////////////////
// Transport interface methods.
@@ -196,6 +195,9 @@ object WebSocketTransportFactory extends
@BeanProperty
var transportListener: TransportListener = _
+ val certificates = request.getAttribute("javax.servlet.request.X509Certificate").asInstanceOf[Array[X509Certificate]]
+ def getPeerX509Certificates = certificates
+
var protocolCodec: ProtocolCodec = _
// Seems most browsers don't support binary transfers yet, so only enable it if
@@ -236,21 +238,10 @@ object WebSocketTransportFactory extends
def getLocalAddress = new InetSocketAddress(request.getLocalAddr, request.getLocalPort)
def getRemoteAddress = new InetSocketAddress(request.getRemoteHost, request.getRemotePort)
- def getTypeId = server.uri.getScheme
def isConnected = connection == null || connection.isOpen
- def isDisposed = connection == null
- def isFaultTolerant = false
-
- def reconnect(p1: URI) = throw new UnsupportedOperationException()
+ def isClosed = connection == null
- def narrow[T](target: Class[T]): T = {
- if (target.isAssignableFrom(getClass())) {
- return target.cast(this);
- }
- return null;
- }
-
/////////////////////////////////////////////////////////////////////////
//
// WebSocket Lifecycle Callbacks...
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Feb 1 11:42:25 2012
@@ -40,6 +40,8 @@ import protocol._
import security.SecurityContext
import DestinationConverter._
import Buffer._
+import java.net.InetSocketAddress
+import java.security.cert.X509Certificate
object OpenwireProtocolHandler extends Log {
def unit:Unit = {}
@@ -417,7 +419,9 @@ class OpenwireProtocolHandler extends Pr
val brokerInfo = new BrokerInfo();
brokerInfo.setBrokerId(new BrokerId(utf8(host.config.id)));
brokerInfo.setBrokerName(utf8(host.config.id));
- brokerInfo.setBrokerURL(utf8(host.broker.get_connect_address));
+
+
+ brokerInfo.setBrokerURL(utf8("tcp://"+host.broker.get_connect_address));
connection_session.offer(brokerInfo);
}
@@ -430,6 +434,12 @@ class OpenwireProtocolHandler extends Pr
if (connection_context==null) {
new ConnectionContext(info).attach
+ connection.transport match {
+ case t:SecureTransport=>
+ security_context.certificates = Option(t.getPeerX509Certificates).getOrElse(Array[X509Certificate]())
+ case _ =>
+ }
+
security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Feb 1 11:42:25 2012
@@ -37,7 +37,7 @@ import java.security.cert.X509Certificat
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
import org.apache.activemq.apollo.dto._
-import org.fusesource.hawtdispatch.transport.{HeartBeatMonitor, SslTransport}
+import org.fusesource.hawtdispatch.transport.{SecureTransport, HeartBeatMonitor, SslTransport}
case class RichBuffer(self:Buffer) extends Proxy {
@@ -824,9 +824,9 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap):Unit = {
connection.transport match {
- case t:SslTransport=>
+ case t:SecureTransport=>
security_context.certificates = Option(t.getPeerX509Certificates).getOrElse(Array[X509Certificate]())
- case _ => None
+ case _ =>
}
security_context.local_address = connection.transport.getLocalAddress
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Feb 1 11:42:25 2012
@@ -96,7 +96,7 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtdispatch-version>1.8</hawtdispatch-version>
+ <hawtdispatch-version>1.9-SNAPSHOT</hawtdispatch-version>
<hawtbuf-version>1.8</hawtbuf-version>
<stompjms-version>1.8</stompjms-version>