You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 12:42:25 UTC

svn commit: r1239089 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/ apollo-openwire/src/main/scala/org/apache/activemq/apoll...

Author: chirino
Date: Wed Feb  1 11:42:25 2012
New Revision: 1239089

URL: http://svn.apache.org/viewvc?rev=1239089&view=rev
Log:
Update to latest hawtdispatch, contains some leaky abstraction fixes also allows a wss:// transport to pass the client credentials to the security system for authentication/authorization.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Feb  1 11:42:25 2012
@@ -36,6 +36,7 @@ import org.fusesource.hawtdispatch.TaskT
 import java.util.concurrent.TimeUnit
 import security.SecuredResource.BrokerKind
 import reflect.BeanProperty
+import java.net.InetSocketAddress
 
 /**
  * <p>
@@ -615,7 +616,10 @@ class Broker() extends BaseService with 
 
   //useful for testing
   def get_connect_address = {
-    Option(config.client_address).getOrElse(first_accepting_connector.get.transport_server.getConnectAddress)
+    Option(config.client_address).getOrElse {
+      val address= get_socket_address.asInstanceOf[InetSocketAddress]
+      "%s:%d".format(address.getHostName, address.getPort)
+    }
   }
 
   def get_socket_address = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Feb  1 11:42:25 2012
@@ -46,7 +46,7 @@ abstract class Connection() extends Base
     transport.setDispatchQueue(dispatch_queue);
     transport.setTransportListener(new TransportListener(){
       def onTransportFailure(error: IOException) = Connection.this.on_transport_failure(error)
-      def onTransportDisconnected(reconnecting:Boolean) = Connection.this.on_transport_disconnected
+      def onTransportDisconnected = Connection.this.on_transport_disconnected
       def onTransportConnected =  Connection.this.on_transport_connected
       def onTransportCommand(command: AnyRef) =  Connection.this.on_transport_command(command)
       def onRefill =  Connection.this.on_refill

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb  1 11:42:25 2012
@@ -39,6 +39,7 @@ import java.util.concurrent.ArrayBlockin
 import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 import java.io.{EOFException, IOException}
+import java.security.cert.X509Certificate
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -153,8 +154,6 @@ object WebSocketTransportFactory extends
       new URI(uri.getScheme + "://" + uri.getHost + ":" + connector.getLocalPort + prefix).toString
     }
 
-    def getConnectAddress = getBoundAddress
-
     def getSocketAddress = new InetSocketAddress(uri.getHost, connector.getLocalPort)
 
     val pending_connects = new ArrayBlockingQueue[WebSocketTransport](100)
@@ -184,7 +183,7 @@ object WebSocketTransportFactory extends
    *
    */
   case class WebSocketTransport(server: WsTransportServer, request: HttpServletRequest, protocol: String) 
-          extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with Transport with ScatteringByteChannel with GatheringByteChannel {
+          extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with SecureTransport with ScatteringByteChannel with GatheringByteChannel {
 
     /////////////////////////////////////////////////////////////////////////
     // Transport interface methods.
@@ -196,6 +195,9 @@ object WebSocketTransportFactory extends
     @BeanProperty
     var transportListener: TransportListener = _
 
+    val certificates = request.getAttribute("javax.servlet.request.X509Certificate").asInstanceOf[Array[X509Certificate]]
+    def getPeerX509Certificates = certificates
+
     var protocolCodec: ProtocolCodec = _
 
     // Seems most browsers don't support binary transfers yet, so only enable it if
@@ -236,21 +238,10 @@ object WebSocketTransportFactory extends
     
     def getLocalAddress = new InetSocketAddress(request.getLocalAddr, request.getLocalPort)
     def getRemoteAddress = new InetSocketAddress(request.getRemoteHost, request.getRemotePort)
-    def getTypeId = server.uri.getScheme
 
     def isConnected = connection == null || connection.isOpen
-    def isDisposed = connection == null
-    def isFaultTolerant = false
-
-    def reconnect(p1: URI) = throw new UnsupportedOperationException()
+    def isClosed = connection == null
 
-    def narrow[T](target: Class[T]): T = {
-      if (target.isAssignableFrom(getClass())) {
-        return target.cast(this);
-      }
-      return null;
-    }
-    
     /////////////////////////////////////////////////////////////////////////
     //
     // WebSocket Lifecycle Callbacks...

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Feb  1 11:42:25 2012
@@ -40,6 +40,8 @@ import protocol._
 import security.SecurityContext
 import DestinationConverter._
 import Buffer._
+import java.net.InetSocketAddress
+import java.security.cert.X509Certificate
 
 object OpenwireProtocolHandler extends Log {
   def unit:Unit = {}
@@ -417,7 +419,9 @@ class OpenwireProtocolHandler extends Pr
     val brokerInfo = new BrokerInfo();
     brokerInfo.setBrokerId(new BrokerId(utf8(host.config.id)));
     brokerInfo.setBrokerName(utf8(host.config.id));
-    brokerInfo.setBrokerURL(utf8(host.broker.get_connect_address));
+
+
+    brokerInfo.setBrokerURL(utf8("tcp://"+host.broker.get_connect_address));
     connection_session.offer(brokerInfo);
   }
 
@@ -430,6 +434,12 @@ class OpenwireProtocolHandler extends Pr
     if (connection_context==null) {
       new ConnectionContext(info).attach
 
+      connection.transport match {
+        case t:SecureTransport=>
+          security_context.certificates = Option(t.getPeerX509Certificates).getOrElse(Array[X509Certificate]())
+        case _ =>
+      }
+
       security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
       security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
       security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Feb  1 11:42:25 2012
@@ -37,7 +37,7 @@ import java.security.cert.X509Certificat
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
 import org.apache.activemq.apollo.dto._
-import org.fusesource.hawtdispatch.transport.{HeartBeatMonitor, SslTransport}
+import org.fusesource.hawtdispatch.transport.{SecureTransport, HeartBeatMonitor, SslTransport}
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -824,9 +824,9 @@ class StompProtocolHandler extends Proto
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
     connection.transport match {
-      case t:SslTransport=>
+      case t:SecureTransport=>
         security_context.certificates = Option(t.getPeerX509Certificates).getOrElse(Array[X509Certificate]())
-      case _ => None
+      case _ =>
     }
 
     security_context.local_address = connection.transport.getLocalAddress

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1239089&r1=1239088&r2=1239089&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Feb  1 11:42:25 2012
@@ -96,7 +96,7 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.8</hawtdispatch-version>
+    <hawtdispatch-version>1.9-SNAPSHOT</hawtdispatch-version>
     <hawtbuf-version>1.8</hawtbuf-version>
     <stompjms-version>1.8</stompjms-version>