You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 05:36:05 UTC
svn commit: r1238933 - in /activemq/activemq-apollo/trunk:
apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Author: chirino
Date: Wed Feb 1 04:36:04 2012
New Revision: 1238933
URL: http://svn.apache.org/viewvc?rev=1238933&view=rev
Log:
Stopped using the continuations api and compiler plugin. It just added too much voodoo and therefore increased project complexity.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-scala/pom.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Wed Feb 1 04:36:04 2012
@@ -24,10 +24,9 @@ import atomic.{AtomicReference, AtomicLo
import org.apache.activemq.apollo.broker.store._
import org.apache.activemq.apollo.util._
import org.fusesource.hawtdispatch.ListEventAggregator
-import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO}
+import org.apache.activemq.apollo.dto.StoreStatusDTO
import org.apache.activemq.apollo.util.OptionSupport._
-import java.io.{InputStream, OutputStream, File}
-import scala.util.continuations._
+import java.io.{InputStream, OutputStream}
import org.fusesource.hawtbuf.Buffer
/**
@@ -42,7 +41,6 @@ object BDBStore extends Log {
*/
class BDBStore(var config:BDBStoreDTO) extends DelayingStoreSupport {
- import BDBStore._
var next_queue_key = new AtomicLong(1)
var next_msg_key = new AtomicLong(1)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Feb 1 04:36:04 2012
@@ -603,11 +603,13 @@ class Broker() extends BaseService with
}
}
- def get_virtual_host(name: AsciiBuffer) = dispatch_queue ! {
+ def get_virtual_host(name: AsciiBuffer) = {
+ dispatch_queue.assertExecuting()
virtual_hosts_by_hostname.getOrElse(name, null)
}
- def get_default_virtual_host = dispatch_queue ! {
+ def get_default_virtual_host = {
+ dispatch_queue.assertExecuting()
default_virtual_host
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Feb 1 04:36:04 2012
@@ -1008,49 +1008,44 @@ class LocalRouter(val virtual_host:Virtu
def topic_domain:Domain[_ <: DomainDestination, TopicDestinationDTO] = local_topic_domain
def dsub_domain:Domain[_ <: DomainDestination, DurableSubscriptionDestinationDTO] = local_dsub_domain
- def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
- consumer.retain
- dispatch_queue ! {
- var rc:Option[String] = None
- if(rc.isEmpty && !virtual_host.service_state.is_started) {
- rc = Some("virtual host stopped.")
- } else if (rc.isEmpty) {
- try {
- val actions = destinations.map { destination =>
- destination match {
- case destination:TopicDestinationDTO =>
- val path = destination_parser.decode_path(destination.path)
- val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
- def perform() = topic_domain.bind(path, destination, consumer, security)
- (allowed, perform _)
- case destination:QueueDestinationDTO =>
- val path = destination_parser.decode_path(destination.path)
- val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
- def perform() = queue_domain.bind(path, destination, consumer, security)
- (allowed, perform _)
- case destination:DurableSubscriptionDestinationDTO =>
- val path = Path(destination.subscription_id())
- val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
- def perform() = dsub_domain.bind(path, destination, consumer, security)
- (allowed, perform _)
- case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
- }
+ def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
+ dispatch_queue.assertExecuting()
+ if(!virtual_host.service_state.is_started) {
+ return Some("virtual host stopped.")
+ } else {
+ try {
+ val actions = destinations.map { destination =>
+ destination match {
+ case destination:TopicDestinationDTO =>
+ val path = destination_parser.decode_path(destination.path)
+ val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
+ def perform() = topic_domain.bind(path, destination, consumer, security)
+ (allowed, perform _)
+ case destination:QueueDestinationDTO =>
+ val path = destination_parser.decode_path(destination.path)
+ val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
+ def perform() = queue_domain.bind(path, destination, consumer, security)
+ (allowed, perform _)
+ case destination:DurableSubscriptionDestinationDTO =>
+ val path = Path(destination.subscription_id())
+ val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
+ def perform() = dsub_domain.bind(path, destination, consumer, security)
+ (allowed, perform _)
+ case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
}
+ }
- val failures = actions.flatMap(_._1)
- rc = if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
- } else {
- actions.foreach(_._2())
- None
- }
- } catch {
- case x:PathException =>
- rc = Some(x.getMessage)
+ val failures = actions.flatMap(_._1)
+ if( !failures.isEmpty ) {
+ return Some(failures.mkString("; "))
+ } else {
+ actions.foreach(_._2())
+ return None
}
+ } catch {
+ case x:PathException =>
+ return Some(x.getMessage)
}
- consumer.release
- rc
}
}
@@ -1072,14 +1067,13 @@ class LocalRouter(val virtual_host:Virtu
}
}
- def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext) = {
+ def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext):Option[String] = {
+ dispatch_queue.assertExecuting()
producer.retain
- dispatch_queue ! {
- var rc:Option[String] = None
- if(rc.isEmpty && !virtual_host.service_state.is_started) {
- rc = Some("virtual host stopped.")
- } else if(rc.isEmpty) {
-
+ try {
+ if(!virtual_host.service_state.is_started) {
+ return Some("virtual host stopped.")
+ } else {
val actions = destinations.map { destination =>
destination match {
case destination:TopicDestinationDTO =>
@@ -1100,19 +1094,19 @@ class LocalRouter(val virtual_host:Virtu
case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
}
}
-
+
val failures = actions.flatMap(_._1)
- rc = if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
+ if( !failures.isEmpty ) {
+ return Some(failures.mkString("; "))
} else {
actions.foreach(_._2())
producer.connected()
producer.retain()
- None
+ return None
}
}
+ } finally {
producer.release
- rc
}
}
@@ -1134,9 +1128,10 @@ class LocalRouter(val virtual_host:Virtu
}
}
- def create(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+ def create(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+ dispatch_queue.assertExecuting()
if(!virtual_host.service_state.is_started) {
- Some("virtual host stopped.")
+ return Some("virtual host stopped.")
} else {
val actions = destinations.map { destination =>
@@ -1162,17 +1157,18 @@ class LocalRouter(val virtual_host:Virtu
val failures = actions.flatMap(_._1)
if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
+ return Some(failures.mkString("; "))
} else {
actions.foreach(_._2())
- None
+ return None
}
}
}
- def delete(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+ def delete(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+ dispatch_queue.assertExecuting()
if(!virtual_host.service_state.is_started) {
- Some("virtual host stopped.")
+ return Some("virtual host stopped.")
} else {
val actions = destinations.map { destination =>
@@ -1198,12 +1194,11 @@ class LocalRouter(val virtual_host:Virtu
val failures = actions.flatMap(_._1)
if( !failures.isEmpty ) {
- Some(failures.mkString("; "))
+ return Some(failures.mkString("; "))
} else {
actions.foreach(_._2())
- None
+ return None
}
-
}
}
@@ -1249,7 +1244,8 @@ class LocalRouter(val virtual_host:Virtu
/**
* Gets an existing queue.
*/
- def get_queue(id:Long) = dispatch_queue ! {
+ def get_queue(id:Long) = {
+ dispatch_queue.assertExecuting()
queues_by_store_id.get(id)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Feb 1 04:36:04 2012
@@ -22,8 +22,7 @@ import scala.collection.immutable.List
import org.apache.activemq.apollo.dto._
import security.SecurityContext
import store.StoreUOW
-import util.continuations._
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import java.util.concurrent.atomic.AtomicReference
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,19 +31,19 @@ trait Router extends Service {
def virtual_host:VirtualHost
- def get_queue(dto:Long):Option[Queue] @suspendable
+ def get_queue(dto:Long):Option[Queue]
- def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Option[String] @suspendable
+ def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
- def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String] @suspendable
+ def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String]
def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
- def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+ def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
- def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+ def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
def apply_update(on_completed:Runnable):Unit
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala Wed Feb 1 04:36:04 2012
@@ -15,7 +15,6 @@
* limitations under the License.
*/
package org.apache.activemq.apollo.broker.security
-import scala.util.continuations._
/**
* <p>
@@ -32,7 +31,7 @@ trait Authenticator {
* @returns null if the SecurityContext was authenticated. Otherwise
* returns an error message that can be given to a client.
*/
- def authenticate(ctx:SecurityContext):String @suspendable
+ def authenticate(ctx:SecurityContext)(cb:(String)=>Unit)
/**
* Extracts the user name of the logged in user.
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala Wed Feb 1 04:36:04 2012
@@ -63,8 +63,8 @@ class JaasAuthenticator(val config: Auth
* execute on the global thread pool since JAAS requests could
* potentially perform a blocking wait (e.g. LDAP request).
*/
- def authenticate(security_ctx: SecurityContext) = BLOCKABLE_THREAD_POOL ! {
- _authenticate(security_ctx)
+ def authenticate(security_ctx: SecurityContext)(cb:(String)=>Unit) = BLOCKABLE_THREAD_POOL {
+ cb(_authenticate(security_ctx))
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Wed Feb 1 04:36:04 2012
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.apollo.broker
-import org.fusesource.hawtbuf.Buffer._
-import scala.util.continuations._
import org.apache.activemq.apollo.util.{ServiceControl, FunSuiteSupport}
-import org.apache.activemq.apollo.dto._
/**
* <p>
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Wed Feb 1 04:36:04 2012
@@ -26,10 +26,8 @@ import org.apache.activemq.apollo.util._
import org.fusesource.hawtdispatch.ListEventAggregator
import org.apache.activemq.apollo.dto.StoreStatusDTO
import org.apache.activemq.apollo.util.OptionSupport._
-import scala.util.continuations._
import java.io._
import org.apache.activemq.apollo.web.resources.ViewHelper
-import collection.mutable.ListBuffer
import org.fusesource.hawtbuf.Buffer
/**
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Wed Feb 1 04:36:04 2012
@@ -22,7 +22,6 @@ import org.apache.activemq.apollo.broker
import collection.mutable.HashMap
import DestinationConverter._
import support.advisory.AdvisorySupport
-import scala.util.continuations._
import org.apache.activemq.apollo.util._
import java.util.Map.Entry
import org.apache.activemq.apollo.broker._
@@ -184,13 +183,10 @@ class DestinationAdvisoryRouterListener(
// create the producer route...
val route = new ProducerRoute
producerRoutes.put(key, route)
- reset {
- val rc = router.connect(dest, route, null)
- rc match {
- case Some(failure) =>
- warn("Could not connect to advisory topic: " + message.getDestination)
- case None =>
- }
+ val rc = router.connect(dest, route, null)
+ rc match {
+ case Some(failure) => warn("Could not connect to advisory topic: " + message.getDestination)
+ case None =>
}
route
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Feb 1 04:36:04 2012
@@ -30,7 +30,6 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
-import scala.util.continuations._
import org.fusesource.hawtdispatch.transport._
import codec.OpenWireFormat
import command._
@@ -196,13 +195,16 @@ class OpenwireProtocolHandler extends Pr
connection.transport.offer(preferred_wireformat_settings)
resume_read
- reset {
- suspend_read("virtual host lookup")
- this.host = broker.get_default_virtual_host
- connection_log = this.host.connection_log
- resume_read
- if(host==null) {
- async_die("Could not find default virtual host")
+ suspend_read("virtual host lookup")
+ broker.dispatch_queue {
+ var host = broker.get_default_virtual_host
+ dispatchQueue {
+ this.host = host
+ connection_log = this.host.connection_log
+ resume_read
+ if(host==null) {
+ async_die("Could not find default virtual host")
+ }
}
}
}
@@ -432,28 +434,24 @@ class OpenwireProtocolHandler extends Pr
security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
- reset {
- if( host.authenticator!=null && host.authorizer!=null ) {
- suspend_read("authenticating and authorizing connect")
- val auth_failure = host.authenticator.authenticate(security_context)
- if( auth_failure!=null ) {
- async_die(auth_failure+". Credentials="+security_context.credential_dump)
- noop // to make the cps compiler plugin happy.
- } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
- async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
- noop // to make the cps compiler plugin happy.
- } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
- async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
- noop // to make the cps compiler plugin happy.
- } else {
- resume_read
- ack(info);
- noop
+ if( host.authenticator!=null && host.authorizer!=null ) {
+ suspend_read("authenticating and authorizing connect")
+ host.authenticator.authenticate(security_context) { auth_failure =>
+ dispatchQueue {
+ if( auth_failure!=null ) {
+ async_die(auth_failure+". Credentials="+security_context.credential_dump)
+ } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+ async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
+ } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
+ async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
+ } else {
+ resume_read
+ ack(info);
+ }
}
- } else {
- ack(info);
- noop
}
+ } else {
+ ack(info);
}
} else {
ack(info);
@@ -493,18 +491,20 @@ class OpenwireProtocolHandler extends Pr
// if( info.getDestination.isTemporary ) {
// destinations.foreach(_.temp_owner = connection.id)
// }
- reset{
+ host.dispatch_queue {
val rc = info.getOperationType match {
case DestinationInfo.ADD_OPERATION_TYPE=>
host.router.create(destinations, security_context)
case DestinationInfo.REMOVE_OPERATION_TYPE=>
host.router.delete(destinations, security_context)
}
- rc match {
- case None =>
- ack(info)
- case Some(error)=>
- ack(info)
+ dispatchQueue {
+ rc match {
+ case None =>
+ ack(info)
+ case Some(error)=>
+ ack(info)
+ }
}
}
}
@@ -619,17 +619,19 @@ class OpenwireProtocolHandler extends Pr
// don't process frames until producer is connected...
connection.transport.suspendRead
- reset {
+ host.dispatch_queue {
val rc = host.router.connect(destiantion, route, security_context)
- rc match {
- case Some(failure) =>
- async_die(failure, msg)
- case None =>
- if (!connection.stopped) {
- resume_read
- producerRoutes.put(key, route)
- send_via_route(route, msg, uow)
- }
+ dispatchQueue {
+ rc match {
+ case Some(failure) =>
+ async_die(failure, msg)
+ case None =>
+ if (!connection.stopped) {
+ resume_read
+ producerRoutes.put(key, route)
+ send_via_route(route, msg, uow)
+ }
+ }
}
}
@@ -751,8 +753,6 @@ class OpenwireProtocolHandler extends Pr
}
}
- def noop = shift { k: (Unit=>Unit) => k() }
-
class ProducerContext(val parent: SessionContext, val info: ProducerInfo) {
def attach = {
parent.producers.put(info.getProducerId, this)
@@ -875,18 +875,18 @@ class OpenwireProtocolHandler extends Pr
destination = Array(rc)
}
- reset {
+ host.dispatch_queue {
val rc = host.router.bind(destination, this, security_context)
- rc match {
- case None =>
- ack(info)
- noop
- case Some(reason) =>
- async_fail(reason, info)
- noop
+ this.release
+ dispatchQueue {
+ rc match {
+ case None =>
+ ack(info)
+ case Some(reason) =>
+ async_fail(reason, info)
+ }
}
}
- this.release
}
def dettach = {
@@ -962,13 +962,13 @@ class OpenwireProtocolHandler extends Pr
}
})
if( info.getDestination.isTemporary ) {
- reset {
+ dispatch_queue {
val rc = host.router.delete(destination, security_context)
- rc match {
- case Some(error) =>
- async_die(error)
- case None =>
- unit
+ dispatchQueue {
+ rc match {
+ case Some(error) => async_die(error)
+ case None =>
+ }
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-scala/pom.xml?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-scala/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-scala/pom.xml Wed Feb 1 04:36:04 2012
@@ -82,15 +82,9 @@
<configuration>
<args>
<arg>-deprecation</arg>
- <arg>-P:continuations:enable</arg>
</args>
<compilerPlugins>
<compilerPlugin>
- <groupId>org.scala-lang.plugins</groupId>
- <artifactId>continuations</artifactId>
- <version>${scala-version}</version>
- </compilerPlugin>
- <compilerPlugin>
<groupId>org.fusesource.jvmassert</groupId>
<artifactId>jvmassert</artifactId>
<version>1.2</version>
@@ -141,15 +135,9 @@
<configuration>
<args>
<arg>-deprecation</arg>
- <arg>-P:continuations:enable</arg>
</args>
<compilerPlugins>
<compilerPlugin>
- <groupId>org.scala-lang.plugins</groupId>
- <artifactId>continuations</artifactId>
- <version>${scala-version}</version>
- </compilerPlugin>
- <compilerPlugin>
<groupId>org.fusesource.jvmassert</groupId>
<artifactId>jvmassert</artifactId>
<version>1.2</version>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Feb 1 04:36:04 2012
@@ -33,8 +33,6 @@ import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
import path.PathParser
-import path.PathParser._
-import scala.util.continuations._
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
@@ -55,7 +53,6 @@ object BufferSupport {
implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
}
-import BufferSupport._
object StompProtocolHandler extends Log {
@@ -78,9 +75,6 @@ object StompProtocolHandler extends Log
val WAITING_ON_SHUTDOWN: () => String = () => {
"shutdown"
}
-
- def noop = shift { k: (Unit=>Unit) => k() }
- def unit:Unit = {}
}
/**
@@ -908,53 +902,48 @@ class StompProtocolHandler extends Proto
codec.direct_buffer_allocator = this.host.direct_buffer_allocator
}
- reset {
- suspend_read("virtual host lookup")
- val host_header = get(headers, HOST)
+ suspend_read("virtual host lookup")
+ val host_header = get(headers, HOST)
+
+ broker.dispatch_queue {
val host = host_header match {
- case None=>
- connection.connector.broker.get_default_virtual_host
- case Some(host)=>
- connection.connector.broker.get_virtual_host(host)
- }
- resume_read
-
- if(host==null) {
- async_die("Invalid virtual host: "+host_header.get)
- noop
- } else if(!host.service_state.is_started) {
- var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) :: Nil
- host.client_redirect.foreach(x=> headers ::= REDIRECT_HEADER->encode_header(x) )
- async_die(headers, "")
- noop
- } else {
- this.host=host
- security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
- connection_log = host.connection_log
- if( host.authenticator!=null && host.authorizer!=null ) {
- suspend_read("authenticating and authorizing connect")
- var auth_failure = host.authenticator.authenticate(security_context)
- if( auth_failure!=null ) {
- async_die(auth_failure+". Credentials="+security_context.credential_dump)
- noop // to make the cps compiler plugin happy.
- } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
- async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
- noop // to make the cps compiler plugin happy.
- } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
- async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
- noop // to make the cps compiler plugin happy.
+ case None=> broker.default_virtual_host
+ case Some(host)=> broker.get_virtual_host(host)
+ }
+ dispatchQueue {
+ resume_read
+ if(host==null) {
+ async_die("Invalid virtual host: "+host_header.get)
+ } else if(!host.service_state.is_started) {
+ var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) :: Nil
+ host.client_redirect.foreach(x=> headers ::= REDIRECT_HEADER->encode_header(x) )
+ async_die(headers, "")
+ } else {
+ this.host=host
+ security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
+ connection_log = host.connection_log
+ if( host.authenticator!=null && host.authorizer!=null ) {
+ suspend_read("authenticating and authorizing connect")
+ host.authenticator.authenticate(security_context) { auth_failure=>
+ dispatchQueue {
+ if( auth_failure!=null ) {
+ async_die(auth_failure+". Credentials="+security_context.credential_dump)
+ } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+ async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
+ } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
+ async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
+ } else {
+ resume_read
+ send_connected
+ }
+ }
+ }
} else {
- resume_read
send_connected
- noop // to make the cps compiler plugin happy.
}
- } else {
- send_connected
- noop // to make the cps compiler plugin happy.
}
}
}
-
}
def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
@@ -1013,17 +1002,19 @@ class StompProtocolHandler extends Proto
// don't process frames until producer is connected...
connection.transport.suspendRead
- reset {
+ host.dispatch_queue {
val rc = host.router.connect(destination, route, security_context)
- rc match {
- case Some(failure) =>
- async_die(failure)
- case None =>
- if (!connection.stopped) {
- resume_read
- producerRoutes.put(key, route)
- send_via_route(destination, route, frame, uow)
- }
+ dispatchQueue {
+ rc match {
+ case Some(failure) =>
+ async_die(failure)
+ case None =>
+ if (!connection.stopped) {
+ resume_read
+ producerRoutes.put(key, route)
+ send_via_route(destination, route, frame, uow)
+ }
+ }
}
}
@@ -1258,38 +1249,19 @@ class StompProtocolHandler extends Proto
val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
consumers += (id -> consumer)
- reset {
+ host.dispatch_queue {
val rc = host.router.bind(destination, consumer, security_context)
consumer.release
- rc match {
- case Some(reason)=>
- consumers -= id
- async_die(reason)
- case None =>
- send_receipt(headers)
- unit
+ dispatchQueue {
+ rc match {
+ case Some(reason)=>
+ consumers -= id
+ async_die(reason)
+ case None =>
+ send_receipt(headers)
+ }
}
}
-
-// reset {
-// // create a queue and bind the consumer to it.
-// val x= host.router.get_or_create_queue(binding, security_context)
-// x match {
-// case Success(queue) =>
-// val rc = queue.bind(consumer, security_context)
-// consumer.release
-// rc match {
-// case Failure(reason)=>
-// consumers -= id
-// async_die(reason)
-// case _ =>
-// send_receipt(headers)
-// }
-// case Failure(reason) =>
-// consumers -= id
-// async_die(reason)
-// }
-// }
}
def on_stomp_unsubscribe(headers:HeaderMap):Unit = {
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala Wed Feb 1 04:36:04 2012
@@ -29,7 +29,6 @@ import javax.servlet.http.{HttpServletRe
import java.io.UnsupportedEncodingException
import org.apache.activemq.apollo.broker._
import security.{SecuredResource, Authorizer, SecurityContext, Authenticator}
-import util.continuations._
import org.apache.activemq.apollo.util._
import java.net.{InetSocketAddress, URI}
import java.security.cert.X509Certificate
@@ -249,9 +248,8 @@ abstract class Resource(parent:Resource=
}
}
}
-
- reset {
- if( authenticator.authenticate(security_context)==null ) {
+ authenticator.authenticate(security_context) { failure=>
+ if( failure==null ) {
call_func_with_security
} else {
func(null)