You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/12 02:27:47 UTC

svn commit: r1034237 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Fri Nov 12 01:27:47 2010
New Revision: 1034237

URL: http://svn.apache.org/viewvc?rev=1034237&view=rev
Log:
Updating the stomp protocol to use the security services configured on a virtual host.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Nov 12 01:27:47 2010
@@ -27,10 +27,10 @@ import collection.{JavaConversions, Sort
 import JavaConversions._
 import org.apache.activemq.apollo.dto.{VirtualHostStatusDTO, ConnectorStatusDTO, BrokerStatusDTO, BrokerDTO}
 import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.activemq.apollo.util._
 import ReporterLevel._
 import collection.mutable.LinkedHashMap
+import java.util.concurrent.{ThreadFactory, Executors, ConcurrentHashMap}
 
 /**
  * <p>
@@ -105,6 +105,12 @@ object BrokerRegistry {
 
 object Broker extends Log {
 
+  val BLOCKABLE_THREAD_POOL = Executors.newCachedThreadPool(new ThreadFactory(){
+    def newThread(r: Runnable) = new Thread(r, "Apollo Worker") {
+      setDaemon(true)
+    }
+  })
+
   val broker_id_counter = new AtomicLong()
 
   val STICK_ON_THREAD_QUEUES = true

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Nov 12 01:27:47 2010
@@ -31,6 +31,7 @@ import org.fusesource.hawtbuf.{Buffer, A
 import collection.JavaConversions
 import java.util.concurrent.atomic.AtomicLong
 import org.apache.activemq.apollo.util.OptionSupport._
+import security.{Authenticator, Authorizer}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -95,6 +96,9 @@ class VirtualHost(val broker: Broker, va
 
   val session_counter = new AtomicLong(0)
 
+  var authenticator:Authenticator = _
+  var authorizer:Authorizer = _
+
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 
   /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Nov 12 01:27:47 2010
@@ -25,6 +25,7 @@ import AsciiBuffer._
 import org.apache.activemq.apollo.broker._
 import java.lang.String
 import protocol.{HeartBeatMonitor, ProtocolFactory, Protocol, ProtocolHandler}
+import security.SecurityContext
 import Stomp._
 import BufferConversions._
 import java.io.IOException
@@ -148,7 +149,7 @@ class StompProtocolHandler extends Proto
     var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
 
     def track(delivery:Delivery) = {
-      queue {
+      queue.apply {
         if( protocol_version eq V1_0 ) {
           // register on the connection since 1.0 acks may not include the subscription id
           connection_ack_handlers += ( delivery.message.id-> this )
@@ -197,7 +198,7 @@ class StompProtocolHandler extends Proto
     var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
 
     def track(delivery:Delivery) = {
-      queue {
+      queue.apply {
         if( protocol_version eq V1_0 ) {
           // register on the connection since 1.0 acks may not include the subscription id
           connection_ack_handlers += ( delivery.message.id-> this )
@@ -305,18 +306,16 @@ class StompProtocolHandler extends Proto
 
   var session_id:AsciiBuffer = _
   var protocol_version:AsciiBuffer = _
-  var login:Option[AsciiBuffer] = None
-  var passcode:Option[AsciiBuffer] = None
 
   var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
-
+  val security_context = new SecurityContext
   var waiting_on:String = "client request"
 
 
   override def create_connection_status = {
     var rc = new StompConnectionStatusDTO
     rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
-    rc.user = login.map(_.toString).getOrElse(null)
+    rc.user = security_context.user
     rc.subscription_count = consumers.size
     rc.waiting_on = waiting_on
     rc
@@ -431,10 +430,8 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
-
-
-    login = get(headers, LOGIN)
-    passcode = get(headers, PASSCODE)
+    security_context.user = get(headers, LOGIN).toString
+    security_context.password = get(headers, PASSCODE).toString
 
     val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
     protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
@@ -484,6 +481,7 @@ class StompProtocolHandler extends Proto
         return
     }
 
+    def noop = shift {  k: (Unit=>Unit) => k() }
     reset {
       suspendRead("virtual host lookup")
       val host_header = get(headers, HOST)
@@ -494,25 +492,41 @@ class StompProtocolHandler extends Proto
           connection.connector.broker.getVirtualHost(host)
       }
       resumeRead
-      if(host!=null) {
+      if(host==null) {
+        die("Invalid virtual host: "+host_header.get)
+        noop // to make the cps compiler plugin happy.
+      } else {
         this.host=host
 
-        val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
-        session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+        var authenticated = true;
 
-        connection_sink.offer(
-          StompFrame(CONNECTED, List(
-            (VERSION, protocol_version),
-            (SESSION, session_id),
-            (HEART_BEAT, outbound_heart_beat_header)
-          )))
-
-        if( this.host.direct_buffer_pool!=null ) {
-          val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-          wf.memory_pool = this.host.direct_buffer_pool
+        if( host.authenticator!=null ) {
+          suspendRead("authenticating")
+          authenticated = host.authenticator.authenticate(security_context)
+          resumeRead
+        } else {
+          noop // to make the cps compiler plugin happy.
         }
-      } else {
-        die("Invalid virtual host: "+host_header.get)
+
+        if( authenticated ) {
+            die("Authentication failed.")
+        } else {
+          val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+          session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+
+          connection_sink.offer(
+            StompFrame(CONNECTED, List(
+              (VERSION, protocol_version),
+              (SESSION, session_id),
+              (HEART_BEAT, outbound_heart_beat_header)
+            )))
+
+          if( this.host.direct_buffer_pool!=null ) {
+            val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+            wf.memory_pool = this.host.direct_buffer_pool
+          }
+        }
+
       }
     }