You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 05:36:05 UTC

svn commit: r1238933 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...

Author: chirino
Date: Wed Feb  1 04:36:04 2012
New Revision: 1238933

URL: http://svn.apache.org/viewvc?rev=1238933&view=rev
Log:
Stopped using the continuations api and compiler plugin.  It just added too much voodoo and therefore increased project complexity.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-scala/pom.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Wed Feb  1 04:36:04 2012
@@ -24,10 +24,9 @@ import atomic.{AtomicReference, AtomicLo
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.ListEventAggregator
-import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO}
+import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util.OptionSupport._
-import java.io.{InputStream, OutputStream, File}
-import scala.util.continuations._
+import java.io.{InputStream, OutputStream}
 import org.fusesource.hawtbuf.Buffer
 
 /**
@@ -42,7 +41,6 @@ object BDBStore extends Log {
  */
 class BDBStore(var config:BDBStoreDTO) extends DelayingStoreSupport {
 
-  import BDBStore._
 
   var next_queue_key = new AtomicLong(1)
   var next_msg_key = new AtomicLong(1)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Feb  1 04:36:04 2012
@@ -603,11 +603,13 @@ class Broker() extends BaseService with 
     }
   }
 
-  def get_virtual_host(name: AsciiBuffer) = dispatch_queue ! {
+  def get_virtual_host(name: AsciiBuffer) = {
+    dispatch_queue.assertExecuting()
     virtual_hosts_by_hostname.getOrElse(name, null)
   }
 
-  def get_default_virtual_host = dispatch_queue ! {
+  def get_default_virtual_host = {
+    dispatch_queue.assertExecuting()
     default_virtual_host
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Feb  1 04:36:04 2012
@@ -1008,49 +1008,44 @@ class LocalRouter(val virtual_host:Virtu
   def topic_domain:Domain[_ <: DomainDestination, TopicDestinationDTO] = local_topic_domain
   def dsub_domain:Domain[_ <: DomainDestination, DurableSubscriptionDestinationDTO] = local_dsub_domain
 
-  def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext) = {
-    consumer.retain
-    dispatch_queue ! {
-      var rc:Option[String] = None
-      if(rc.isEmpty && !virtual_host.service_state.is_started) {
-        rc = Some("virtual host stopped.")
-      } else if (rc.isEmpty) {
-        try {
-          val actions = destinations.map { destination =>
-            destination match {
-              case destination:TopicDestinationDTO =>
-                val path = destination_parser.decode_path(destination.path)
-                val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
-                def perform() = topic_domain.bind(path, destination, consumer, security)
-                (allowed, perform _)
-              case destination:QueueDestinationDTO =>
-                val path = destination_parser.decode_path(destination.path)
-                val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
-                def perform() = queue_domain.bind(path, destination, consumer, security)
-                (allowed, perform _)
-              case destination:DurableSubscriptionDestinationDTO =>
-                val path = Path(destination.subscription_id())
-                val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
-                def perform() = dsub_domain.bind(path, destination, consumer, security)
-                (allowed, perform _)
-              case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
-            }
+  def bind(destinations: Array[DestinationDTO], consumer: DeliveryConsumer, security: SecurityContext):Option[String] = {
+    dispatch_queue.assertExecuting()
+    if(!virtual_host.service_state.is_started) {
+      return Some("virtual host stopped.")
+    } else {
+      try {
+        val actions = destinations.map { destination =>
+          destination match {
+            case destination:TopicDestinationDTO =>
+              val path = destination_parser.decode_path(destination.path)
+              val allowed = topic_domain.can_bind_all(path, destination, consumer, security)
+              def perform() = topic_domain.bind(path, destination, consumer, security)
+              (allowed, perform _)
+            case destination:QueueDestinationDTO =>
+              val path = destination_parser.decode_path(destination.path)
+              val allowed = queue_domain.can_bind_all(path, destination, consumer, security)
+              def perform() = queue_domain.bind(path, destination, consumer, security)
+              (allowed, perform _)
+            case destination:DurableSubscriptionDestinationDTO =>
+              val path = Path(destination.subscription_id())
+              val allowed = dsub_domain.can_bind_all(path, destination, consumer, security)
+              def perform() = dsub_domain.bind(path, destination, consumer, security)
+              (allowed, perform _)
+            case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
           }
+        }
 
-          val failures = actions.flatMap(_._1)
-          rc = if( !failures.isEmpty ) {
-            Some(failures.mkString("; "))
-          } else {
-            actions.foreach(_._2())
-            None
-          }
-        } catch {
-          case x:PathException =>
-            rc = Some(x.getMessage)
+        val failures = actions.flatMap(_._1)
+        if( !failures.isEmpty ) {
+          return Some(failures.mkString("; "))
+        } else {
+          actions.foreach(_._2())
+          return None
         }
+      } catch {
+        case x:PathException =>
+          return Some(x.getMessage)
       }
-      consumer.release
-      rc
     }
   }
 
@@ -1072,14 +1067,13 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext) = {
+  def connect(destinations: Array[DestinationDTO], producer: BindableDeliveryProducer, security: SecurityContext):Option[String] = {
+    dispatch_queue.assertExecuting()
     producer.retain
-    dispatch_queue ! {
-      var rc:Option[String] = None
-      if(rc.isEmpty && !virtual_host.service_state.is_started) {
-        rc = Some("virtual host stopped.")
-      } else if(rc.isEmpty) {
-
+    try {
+      if(!virtual_host.service_state.is_started) {
+        return Some("virtual host stopped.")
+      } else {
         val actions = destinations.map { destination =>
           destination match {
             case destination:TopicDestinationDTO =>
@@ -1100,19 +1094,19 @@ class LocalRouter(val virtual_host:Virtu
             case _ => throw new RuntimeException("Unknown domain type: "+destination.getClass)
           }
         }
-
+  
         val failures = actions.flatMap(_._1)
-        rc = if( !failures.isEmpty ) {
-          Some(failures.mkString("; "))
+        if( !failures.isEmpty ) {
+          return Some(failures.mkString("; "))
         } else {
           actions.foreach(_._2())
           producer.connected()
           producer.retain()
-          None
+          return None
         }
       }
+    } finally {
       producer.release
-      rc
     }
   }
 
@@ -1134,9 +1128,10 @@ class LocalRouter(val virtual_host:Virtu
     }
   }
 
-  def create(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+  def create(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+    dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
-      Some("virtual host stopped.")
+      return Some("virtual host stopped.")
     } else {
 
       val actions = destinations.map { destination =>
@@ -1162,17 +1157,18 @@ class LocalRouter(val virtual_host:Virtu
 
       val failures = actions.flatMap(_._1)
       if( !failures.isEmpty ) {
-        Some(failures.mkString("; "))
+        return Some(failures.mkString("; "))
       } else {
         actions.foreach(_._2())
-        None
+        return None
       }
     }
   }
 
-  def delete(destinations:Array[DestinationDTO], security: SecurityContext) = dispatch_queue ! {
+  def delete(destinations:Array[DestinationDTO], security: SecurityContext):Option[String] = {
+    dispatch_queue.assertExecuting()
     if(!virtual_host.service_state.is_started) {
-      Some("virtual host stopped.")
+      return Some("virtual host stopped.")
     } else {
 
       val actions = destinations.map { destination =>
@@ -1198,12 +1194,11 @@ class LocalRouter(val virtual_host:Virtu
 
       val failures = actions.flatMap(_._1)
       if( !failures.isEmpty ) {
-        Some(failures.mkString("; "))
+        return Some(failures.mkString("; "))
       } else {
         actions.foreach(_._2())
-        None
+        return None
       }
-
     }
   }
 
@@ -1249,7 +1244,8 @@ class LocalRouter(val virtual_host:Virtu
   /**
    * Gets an existing queue.
    */
-  def get_queue(id:Long) = dispatch_queue ! {
+  def get_queue(id:Long) = {
+    dispatch_queue.assertExecuting()
     queues_by_store_id.get(id)
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Feb  1 04:36:04 2012
@@ -22,8 +22,7 @@ import scala.collection.immutable.List
 import org.apache.activemq.apollo.dto._
 import security.SecurityContext
 import store.StoreUOW
-import util.continuations._
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,19 +31,19 @@ trait Router extends Service {
 
   def virtual_host:VirtualHost
 
-  def get_queue(dto:Long):Option[Queue] @suspendable
+  def get_queue(dto:Long):Option[Queue]
 
-  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext) : Option[String] @suspendable
+  def bind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, security:SecurityContext): Option[String]
 
   def unbind(destinations:Array[DestinationDTO], consumer:DeliveryConsumer, persistent:Boolean, security:SecurityContext)
 
-  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String] @suspendable
+  def connect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer, security:SecurityContext): Option[String]
 
   def disconnect(destinations:Array[DestinationDTO], producer:BindableDeliveryProducer)
 
-  def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+  def delete(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
 
-  def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String] @suspendable
+  def create(destinations:Array[DestinationDTO], security:SecurityContext): Option[String]
 
   def apply_update(on_completed:Runnable):Unit
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/Authenticator.scala Wed Feb  1 04:36:04 2012
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 package org.apache.activemq.apollo.broker.security
-import scala.util.continuations._
 
 /**
  * <p>
@@ -32,7 +31,7 @@ trait Authenticator {
    * @returns null if the SecurityContext was authenticated. Otherwise
    * returns an error message that can be given to a client.
    */
-  def authenticate(ctx:SecurityContext):String @suspendable
+  def authenticate(ctx:SecurityContext)(cb:(String)=>Unit)
 
   /**
    * Extracts the user name of the logged in user.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/security/JaasAuthenticator.scala Wed Feb  1 04:36:04 2012
@@ -63,8 +63,8 @@ class JaasAuthenticator(val config: Auth
    * execute on the global thread pool since JAAS requests could
    * potentially perform a blocking wait (e.g. LDAP request).
    */
-  def authenticate(security_ctx: SecurityContext) = BLOCKABLE_THREAD_POOL ! {
-    _authenticate(security_ctx)
+  def authenticate(security_ctx: SecurityContext)(cb:(String)=>Unit) = BLOCKABLE_THREAD_POOL {
+    cb(_authenticate(security_ctx))
   }
 
   /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/DestinationConfigurationTest.scala Wed Feb  1 04:36:04 2012
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.apollo.broker
 
-import org.fusesource.hawtbuf.Buffer._
-import scala.util.continuations._
 import org.apache.activemq.apollo.util.{ServiceControl, FunSuiteSupport}
-import org.apache.activemq.apollo.dto._
 
 /**
  * <p>

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Wed Feb  1 04:36:04 2012
@@ -26,10 +26,8 @@ import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.ListEventAggregator
 import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util.OptionSupport._
-import scala.util.continuations._
 import java.io._
 import org.apache.activemq.apollo.web.resources.ViewHelper
-import collection.mutable.ListBuffer
 import org.fusesource.hawtbuf.Buffer
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Wed Feb  1 04:36:04 2012
@@ -22,7 +22,6 @@ import org.apache.activemq.apollo.broker
 import collection.mutable.HashMap
 import DestinationConverter._
 import support.advisory.AdvisorySupport
-import scala.util.continuations._
 import org.apache.activemq.apollo.util._
 import java.util.Map.Entry
 import org.apache.activemq.apollo.broker._
@@ -184,13 +183,10 @@ class DestinationAdvisoryRouterListener(
         // create the producer route...
         val route = new ProducerRoute
         producerRoutes.put(key, route)
-        reset {
-          val rc = router.connect(dest, route, null)
-          rc match {
-            case Some(failure) =>
-              warn("Could not connect to advisory topic: " + message.getDestination)
-            case None =>
-          }
+        val rc = router.connect(dest, route, null)
+        rc match {
+          case Some(failure) => warn("Could not connect to advisory topic: " + message.getDestination)
+          case None =>
         }
         route
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Feb  1 04:36:04 2012
@@ -30,7 +30,6 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
-import scala.util.continuations._
 import org.fusesource.hawtdispatch.transport._
 import codec.OpenWireFormat
 import command._
@@ -196,13 +195,16 @@ class OpenwireProtocolHandler extends Pr
     connection.transport.offer(preferred_wireformat_settings)
 
     resume_read
-    reset {
-      suspend_read("virtual host lookup")
-      this.host = broker.get_default_virtual_host
-      connection_log = this.host.connection_log
-      resume_read
-      if(host==null) {
-        async_die("Could not find default virtual host")
+    suspend_read("virtual host lookup")
+    broker.dispatch_queue {
+      var host = broker.get_default_virtual_host
+      dispatchQueue {
+        this.host = host
+        connection_log = this.host.connection_log
+        resume_read
+        if(host==null) {
+          async_die("Could not find default virtual host")
+        }
       }
     }
   }
@@ -432,28 +434,24 @@ class OpenwireProtocolHandler extends Pr
       security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
       security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
 
-      reset {
-        if( host.authenticator!=null &&  host.authorizer!=null ) {
-          suspend_read("authenticating and authorizing connect")
-          val auth_failure = host.authenticator.authenticate(security_context)
-          if( auth_failure!=null ) {
-            async_die(auth_failure+". Credentials="+security_context.credential_dump)
-            noop // to make the cps compiler plugin happy.
-          } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
-            async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
-            noop // to make the cps compiler plugin happy.
-          } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
-            async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
-            noop // to make the cps compiler plugin happy.
-          } else {
-            resume_read
-            ack(info);
-            noop
+      if( host.authenticator!=null &&  host.authorizer!=null ) {
+        suspend_read("authenticating and authorizing connect")
+        host.authenticator.authenticate(security_context) { auth_failure =>
+          dispatchQueue {
+            if( auth_failure!=null ) {
+              async_die(auth_failure+". Credentials="+security_context.credential_dump)
+            } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+              async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
+            } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
+              async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
+            } else {
+              resume_read
+              ack(info);
+            }
           }
-        } else {
-          ack(info);
-          noop
         }
+      } else {
+        ack(info);
       }
     } else {
       ack(info);
@@ -493,18 +491,20 @@ class OpenwireProtocolHandler extends Pr
 //    if( info.getDestination.isTemporary ) {
 //      destinations.foreach(_.temp_owner = connection.id)
 //    }
-    reset{
+    host.dispatch_queue {
       val rc = info.getOperationType match {
         case DestinationInfo.ADD_OPERATION_TYPE=>
           host.router.create(destinations, security_context)
         case DestinationInfo.REMOVE_OPERATION_TYPE=>
           host.router.delete(destinations, security_context)
       }
-      rc match {
-        case None =>
-          ack(info)
-        case Some(error)=>
-          ack(info)
+      dispatchQueue {
+        rc match {
+          case None =>
+            ack(info)
+          case Some(error)=>
+            ack(info)
+        }
       }
     }
   }
@@ -619,17 +619,19 @@ class OpenwireProtocolHandler extends Pr
 
         // don't process frames until producer is connected...
         connection.transport.suspendRead
-        reset {
+        host.dispatch_queue {
           val rc = host.router.connect(destiantion, route, security_context)
-          rc match {
-            case Some(failure) =>
-              async_die(failure, msg)
-            case None =>
-              if (!connection.stopped) {
-                resume_read
-                producerRoutes.put(key, route)
-                send_via_route(route, msg, uow)
-              }
+          dispatchQueue {
+            rc match {
+              case Some(failure) =>
+                async_die(failure, msg)
+              case None =>
+                if (!connection.stopped) {
+                  resume_read
+                  producerRoutes.put(key, route)
+                  send_via_route(route, msg, uow)
+                }
+            }
           }
         }
 
@@ -751,8 +753,6 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
-  def noop = shift {  k: (Unit=>Unit) => k() }
-
   class ProducerContext(val parent: SessionContext, val info: ProducerInfo) {
     def attach = {
       parent.producers.put(info.getProducerId, this)
@@ -875,18 +875,18 @@ class OpenwireProtocolHandler extends Pr
         destination = Array(rc)
       }
 
-      reset {
+      host.dispatch_queue {
         val rc = host.router.bind(destination, this, security_context)
-        rc match {
-          case None =>
-            ack(info)
-            noop
-          case Some(reason) =>
-            async_fail(reason, info)
-            noop
+        this.release
+        dispatchQueue {
+          rc match {
+            case None =>
+              ack(info)
+            case Some(reason) =>
+              async_fail(reason, info)
+          }
         }
       }
-      this.release
     }
 
     def dettach = {
@@ -962,13 +962,13 @@ class OpenwireProtocolHandler extends Pr
           }
         })
         if( info.getDestination.isTemporary ) {
-          reset {
+          dispatch_queue {
             val rc = host.router.delete(destination, security_context)
-            rc match {
-              case Some(error) =>
-                async_die(error)
-              case None =>
-                unit
+            dispatchQueue {
+              rc match {
+                case Some(error) => async_die(error)
+                case None =>
+              }
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-scala/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-scala/pom.xml?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-scala/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-scala/pom.xml Wed Feb  1 04:36:04 2012
@@ -82,15 +82,9 @@
             <configuration>
               <args>
                 <arg>-deprecation</arg>
-                <arg>-P:continuations:enable</arg>
               </args>
               <compilerPlugins>
                 <compilerPlugin>
-                  <groupId>org.scala-lang.plugins</groupId>
-                  <artifactId>continuations</artifactId>
-                  <version>${scala-version}</version>
-                </compilerPlugin>
-                <compilerPlugin>
                   <groupId>org.fusesource.jvmassert</groupId>
                   <artifactId>jvmassert</artifactId>
                   <version>1.2</version>
@@ -141,15 +135,9 @@
             <configuration>
               <args>
                 <arg>-deprecation</arg>
-                <arg>-P:continuations:enable</arg>
               </args>
               <compilerPlugins>
                 <compilerPlugin>
-                  <groupId>org.scala-lang.plugins</groupId>
-                  <artifactId>continuations</artifactId>
-                  <version>${scala-version}</version>
-                </compilerPlugin>
-                <compilerPlugin>
                   <groupId>org.fusesource.jvmassert</groupId>
                   <artifactId>jvmassert</artifactId>
                   <version>1.2</version>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Feb  1 04:36:04 2012
@@ -33,8 +33,6 @@ import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
 import path.PathParser
-import path.PathParser._
-import scala.util.continuations._
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
@@ -55,7 +53,6 @@ object BufferSupport {
   implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
 }
 
-import BufferSupport._
 
 object StompProtocolHandler extends Log {
 
@@ -78,9 +75,6 @@ object StompProtocolHandler extends Log 
   val WAITING_ON_SHUTDOWN: () => String = () => {
     "shutdown"
   }
-
-  def noop = shift {  k: (Unit=>Unit) => k() }
-  def unit:Unit = {}
 }
 
 /**
@@ -908,53 +902,48 @@ class StompProtocolHandler extends Proto
       codec.direct_buffer_allocator = this.host.direct_buffer_allocator
     }
 
-    reset {
-      suspend_read("virtual host lookup")
-      val host_header = get(headers, HOST)
+    suspend_read("virtual host lookup")
+    val host_header = get(headers, HOST)
+
+    broker.dispatch_queue {
       val host = host_header match {
-        case None=>
-          connection.connector.broker.get_default_virtual_host
-        case Some(host)=>
-          connection.connector.broker.get_virtual_host(host)
-      }
-      resume_read
-
-      if(host==null) {
-        async_die("Invalid virtual host: "+host_header.get)
-        noop
-      } else if(!host.service_state.is_started) {
-        var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) :: Nil
-        host.client_redirect.foreach(x=> headers ::= REDIRECT_HEADER->encode_header(x) )
-        async_die(headers, "")
-        noop
-      } else {
-        this.host=host
-        security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
-        connection_log = host.connection_log
-        if( host.authenticator!=null &&  host.authorizer!=null ) {
-          suspend_read("authenticating and authorizing connect")
-          var auth_failure = host.authenticator.authenticate(security_context)
-          if( auth_failure!=null ) {
-            async_die(auth_failure+". Credentials="+security_context.credential_dump)
-            noop // to make the cps compiler plugin happy.
-          } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
-            async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
-            noop // to make the cps compiler plugin happy.
-          } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
-            async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
-            noop // to make the cps compiler plugin happy.
+        case None=> broker.default_virtual_host
+        case Some(host)=> broker.get_virtual_host(host)
+      }
+      dispatchQueue {
+        resume_read
+        if(host==null) {
+          async_die("Invalid virtual host: "+host_header.get)
+        } else if(!host.service_state.is_started) {
+          var headers = (MESSAGE_HEADER, encode_header("Virtual host stopped")) :: Nil
+          host.client_redirect.foreach(x=> headers ::= REDIRECT_HEADER->encode_header(x) )
+          async_die(headers, "")
+        } else {
+          this.host=host
+          security_context.session_id = Some("%s-%x".format(destination_parser.sanitize_destination_part(this.host.config.id), this.host.session_counter.incrementAndGet))
+          connection_log = host.connection_log
+          if( host.authenticator!=null &&  host.authorizer!=null ) {
+            suspend_read("authenticating and authorizing connect")
+            host.authenticator.authenticate(security_context) { auth_failure=>
+              dispatchQueue {
+                if( auth_failure!=null ) {
+                  async_die(auth_failure+". Credentials="+security_context.credential_dump)
+                } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+                  async_die("Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
+                } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
+                  async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
+                } else {
+                  resume_read
+                  send_connected
+                }
+              }
+            }
           } else {
-            resume_read
             send_connected
-            noop // to make the cps compiler plugin happy.
           }
-        } else {
-          send_connected
-          noop // to make the cps compiler plugin happy.
         }
       }
     }
-
   }
 
   def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
@@ -1013,17 +1002,19 @@ class StompProtocolHandler extends Proto
 
         // don't process frames until producer is connected...
         connection.transport.suspendRead
-        reset {
+        host.dispatch_queue {
           val rc = host.router.connect(destination, route, security_context)
-          rc match {
-            case Some(failure) =>
-              async_die(failure)
-            case None =>
-              if (!connection.stopped) {
-                resume_read
-                producerRoutes.put(key, route)
-                send_via_route(destination, route, frame, uow)
-              }
+          dispatchQueue {
+            rc match {
+              case Some(failure) =>
+                async_die(failure)
+              case None =>
+                if (!connection.stopped) {
+                  resume_read
+                  producerRoutes.put(key, route)
+                  send_via_route(destination, route, frame, uow)
+                }
+            }
           }
         }
 
@@ -1258,38 +1249,19 @@ class StompProtocolHandler extends Proto
     val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, credit_window, include_seq, from_seq, browser_end);
     consumers += (id -> consumer)
 
-    reset {
+    host.dispatch_queue {
       val rc = host.router.bind(destination, consumer, security_context)
       consumer.release
-      rc match {
-        case Some(reason)=>
-          consumers -= id
-          async_die(reason)
-        case None =>
-          send_receipt(headers)
-          unit
+      dispatchQueue {
+        rc match {
+          case Some(reason)=>
+            consumers -= id
+            async_die(reason)
+          case None =>
+            send_receipt(headers)
+        }
       }
     }
-
-//      reset {
-//        // create a queue and bind the consumer to it.
-//        val x= host.router.get_or_create_queue(binding, security_context)
-//        x match {
-//          case Success(queue) =>
-//            val rc = queue.bind(consumer, security_context)
-//            consumer.release
-//            rc match {
-//              case Failure(reason)=>
-//                consumers -= id
-//                async_die(reason)
-//              case _ =>
-//                send_receipt(headers)
-//            }
-//          case Failure(reason) =>
-//            consumers -= id
-//            async_die(reason)
-//        }
-//      }
   }
 
   def on_stomp_unsubscribe(headers:HeaderMap):Unit = {

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1238933&r1=1238932&r2=1238933&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala Wed Feb  1 04:36:04 2012
@@ -29,7 +29,6 @@ import javax.servlet.http.{HttpServletRe
 import java.io.UnsupportedEncodingException
 import org.apache.activemq.apollo.broker._
 import security.{SecuredResource, Authorizer, SecurityContext, Authenticator}
-import util.continuations._
 import org.apache.activemq.apollo.util._
 import java.net.{InetSocketAddress, URI}
 import java.security.cert.X509Certificate
@@ -249,9 +248,8 @@ abstract class Resource(parent:Resource=
           }
         }
       }
-
-      reset {
-        if( authenticator.authenticate(security_context)==null ) {
+      authenticator.authenticate(security_context) { failure=>
+        if( failure==null ) {
           call_func_with_security
         } else {
           func(null)