You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/12 02:27:47 UTC
svn commit: r1034237 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Fri Nov 12 01:27:47 2010
New Revision: 1034237
URL: http://svn.apache.org/viewvc?rev=1034237&view=rev
Log:
Updating the stomp protocol to use the security services configured on a virtual host.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Nov 12 01:27:47 2010
@@ -27,10 +27,10 @@ import collection.{JavaConversions, Sort
import JavaConversions._
import org.apache.activemq.apollo.dto.{VirtualHostStatusDTO, ConnectorStatusDTO, BrokerStatusDTO, BrokerDTO}
import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.ConcurrentHashMap
import org.apache.activemq.apollo.util._
import ReporterLevel._
import collection.mutable.LinkedHashMap
+import java.util.concurrent.{ThreadFactory, Executors, ConcurrentHashMap}
/**
* <p>
@@ -105,6 +105,12 @@ object BrokerRegistry {
object Broker extends Log {
+ val BLOCKABLE_THREAD_POOL = Executors.newCachedThreadPool(new ThreadFactory(){
+ def newThread(r: Runnable) = new Thread(r, "Apollo Worker") {
+ setDaemon(true)
+ }
+ })
+
val broker_id_counter = new AtomicLong()
val STICK_ON_THREAD_QUEUES = true
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Nov 12 01:27:47 2010
@@ -31,6 +31,7 @@ import org.fusesource.hawtbuf.{Buffer, A
import collection.JavaConversions
import java.util.concurrent.atomic.AtomicLong
import org.apache.activemq.apollo.util.OptionSupport._
+import security.{Authenticator, Authorizer}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -95,6 +96,9 @@ class VirtualHost(val broker: Broker, va
val session_counter = new AtomicLong(0)
+ var authenticator:Authenticator = _
+ var authorizer:Authorizer = _
+
override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
/**
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1034237&r1=1034236&r2=1034237&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Nov 12 01:27:47 2010
@@ -25,6 +25,7 @@ import AsciiBuffer._
import org.apache.activemq.apollo.broker._
import java.lang.String
import protocol.{HeartBeatMonitor, ProtocolFactory, Protocol, ProtocolHandler}
+import security.SecurityContext
import Stomp._
import BufferConversions._
import java.io.IOException
@@ -148,7 +149,7 @@ class StompProtocolHandler extends Proto
var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
def track(delivery:Delivery) = {
- queue {
+ queue.apply {
if( protocol_version eq V1_0 ) {
// register on the connection since 1.0 acks may not include the subscription id
connection_ack_handlers += ( delivery.message.id-> this )
@@ -197,7 +198,7 @@ class StompProtocolHandler extends Proto
var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
def track(delivery:Delivery) = {
- queue {
+ queue.apply {
if( protocol_version eq V1_0 ) {
// register on the connection since 1.0 acks may not include the subscription id
connection_ack_handlers += ( delivery.message.id-> this )
@@ -305,18 +306,16 @@ class StompProtocolHandler extends Proto
var session_id:AsciiBuffer = _
var protocol_version:AsciiBuffer = _
- var login:Option[AsciiBuffer] = None
- var passcode:Option[AsciiBuffer] = None
var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
-
+ val security_context = new SecurityContext
var waiting_on:String = "client request"
override def create_connection_status = {
var rc = new StompConnectionStatusDTO
rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
- rc.user = login.map(_.toString).getOrElse(null)
+ rc.user = security_context.user
rc.subscription_count = consumers.size
rc.waiting_on = waiting_on
rc
@@ -431,10 +430,8 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap):Unit = {
-
-
- login = get(headers, LOGIN)
- passcode = get(headers, PASSCODE)
+ security_context.user = get(headers, LOGIN).toString
+ security_context.password = get(headers, PASSCODE).toString
val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
@@ -484,6 +481,7 @@ class StompProtocolHandler extends Proto
return
}
+ def noop = shift { k: (Unit=>Unit) => k() }
reset {
suspendRead("virtual host lookup")
val host_header = get(headers, HOST)
@@ -494,25 +492,41 @@ class StompProtocolHandler extends Proto
connection.connector.broker.getVirtualHost(host)
}
resumeRead
- if(host!=null) {
+ if(host==null) {
+ die("Invalid virtual host: "+host_header.get)
+ noop // to make the cps compiler plugin happy.
+ } else {
this.host=host
- val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
- session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+ var authenticated = true;
- connection_sink.offer(
- StompFrame(CONNECTED, List(
- (VERSION, protocol_version),
- (SESSION, session_id),
- (HEART_BEAT, outbound_heart_beat_header)
- )))
-
- if( this.host.direct_buffer_pool!=null ) {
- val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
+ if( host.authenticator!=null ) {
+ suspendRead("authenticating")
+ authenticated = host.authenticator.authenticate(security_context)
+ resumeRead
+ } else {
+ noop // to make the cps compiler plugin happy.
}
- } else {
- die("Invalid virtual host: "+host_header.get)
+
+ if( authenticated ) {
+ die("Authentication failed.")
+ } else {
+ val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+ session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+
+ connection_sink.offer(
+ StompFrame(CONNECTED, List(
+ (VERSION, protocol_version),
+ (SESSION, session_id),
+ (HEART_BEAT, outbound_heart_beat_header)
+ )))
+
+ if( this.host.direct_buffer_pool!=null ) {
+ val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+ wf.memory_pool = this.host.direct_buffer_pool
+ }
+ }
+
}
}