You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/16 04:13:40 UTC

svn commit: r1373696 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala

Author: chirino
Date: Thu Aug 16 02:13:39 2012
New Revision: 1373696

URL: http://svn.apache.org/viewvc?rev=1373696&view=rev
Log:
Pickup hawtdispatch interface changes.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1373696&r1=1373695&r2=1373696&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Thu Aug 16 02:13:39 2012
@@ -34,7 +34,7 @@ import scala.reflect.BeanProperty
 import java.nio.ByteBuffer
 import java.nio.channels._
 import scala.collection.mutable.ListBuffer
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.{ExecutorService, Executor, ArrayBlockingQueue}
 import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
 import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 import java.io.{EOFException, IOException}
@@ -65,6 +65,9 @@ object WebSocketTransportFactory extends
     @BeanProperty
     var dispatchQueue = createQueue()
     @BeanProperty
+    var blockingExecutor:Executor = _
+
+    @BeanProperty
     var transportServerListener: TransportServerListener = _
     @BeanProperty
     var binary_transfers = false
@@ -72,6 +75,7 @@ object WebSocketTransportFactory extends
     var cors_origin:String = null
 
     var broker: Broker = _
+    var blocking_executor: Executor = _
 
     def set_broker(value: Broker) = broker = value
 
@@ -83,7 +87,7 @@ object WebSocketTransportFactory extends
     def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
     def stop(on_completed: Runnable):Unit = super.stop(new TaskWrapper(on_completed))
 
-    protected def _start(on_completed: Task) = Broker.BLOCKABLE_THREAD_POOL {
+    protected def _start(on_completed: Task) = blockingExecutor {
       this.synchronized {
 
         IntrospectionSupport.setProperties(this, URISupport.parseParamters(uri));
@@ -138,14 +142,14 @@ object WebSocketTransportFactory extends
         server = new Server
         server.setHandler(context)
         server.setConnectors(Array(connector))
-        server.setThreadPool(new ExecutorThreadPool(Broker.BLOCKABLE_THREAD_POOL))
+        server.setThreadPool(new ExecutorThreadPool(blockingExecutor.asInstanceOf[ExecutorService]))
         server.start
 
         on_completed.run
       }
     }
 
-    def _stop(on_complete: Task) = Broker.BLOCKABLE_THREAD_POOL {
+    def _stop(on_complete: Task) = blockingExecutor {
       this.synchronized {
         if (server != null) {
           try {
@@ -178,7 +182,7 @@ object WebSocketTransportFactory extends
         if (service_state.is_started) {
           transportServerListener.onAccept(transport)
         } else {
-          Broker.BLOCKABLE_THREAD_POOL {
+          blockingExecutor {
             transport.connection.disconnect();
           }
         }
@@ -198,6 +202,9 @@ object WebSocketTransportFactory extends
     // Transport interface methods.
     /////////////////////////////////////////////////////////////////////////
     
+    @BeanProperty
+    var blockingExecutor:Executor = _
+
     var dispatchQueue = createQueue()
 
     def getDispatchQueue: DispatchQueue = dispatchQueue
@@ -357,7 +364,7 @@ object WebSocketTransportFactory extends
         }
       }
 
-      Broker.BLOCKABLE_THREAD_POOL {
+      blockingExecutor {
         inbound.synchronized {
           inbound_capacity_remaining += rc
           inbound.notify();
@@ -492,7 +499,7 @@ object WebSocketTransportFactory extends
 
     var outbound_capacity_remaining = 1024 * 64;
 
-    val outbound_executor = new SerialExecutor(Broker.BLOCKABLE_THREAD_POOL) {
+    val outbound_executor = new SerialExecutor(blockingExecutor) {
       var outbound_drained = 0
       override def drained  = {
         val amount = outbound_drained