You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/16 04:13:40 UTC
svn commit: r1373696 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Author: chirino
Date: Thu Aug 16 02:13:39 2012
New Revision: 1373696
URL: http://svn.apache.org/viewvc?rev=1373696&view=rev
Log:
Pickup hawtdispatch interface changes.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1373696&r1=1373695&r2=1373696&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Thu Aug 16 02:13:39 2012
@@ -34,7 +34,7 @@ import scala.reflect.BeanProperty
import java.nio.ByteBuffer
import java.nio.channels._
import scala.collection.mutable.ListBuffer
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.{ExecutorService, Executor, ArrayBlockingQueue}
import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
import java.io.{EOFException, IOException}
@@ -65,6 +65,9 @@ object WebSocketTransportFactory extends
@BeanProperty
var dispatchQueue = createQueue()
@BeanProperty
+ var blockingExecutor:Executor = _
+
+ @BeanProperty
var transportServerListener: TransportServerListener = _
@BeanProperty
var binary_transfers = false
@@ -72,6 +75,7 @@ object WebSocketTransportFactory extends
var cors_origin:String = null
var broker: Broker = _
+ var blocking_executor: Executor = _
def set_broker(value: Broker) = broker = value
@@ -83,7 +87,7 @@ object WebSocketTransportFactory extends
def start(on_completed: Runnable):Unit = super.start(new TaskWrapper(on_completed))
def stop(on_completed: Runnable):Unit = super.stop(new TaskWrapper(on_completed))
- protected def _start(on_completed: Task) = Broker.BLOCKABLE_THREAD_POOL {
+ protected def _start(on_completed: Task) = blockingExecutor {
this.synchronized {
IntrospectionSupport.setProperties(this, URISupport.parseParamters(uri));
@@ -138,14 +142,14 @@ object WebSocketTransportFactory extends
server = new Server
server.setHandler(context)
server.setConnectors(Array(connector))
- server.setThreadPool(new ExecutorThreadPool(Broker.BLOCKABLE_THREAD_POOL))
+ server.setThreadPool(new ExecutorThreadPool(blockingExecutor.asInstanceOf[ExecutorService]))
server.start
on_completed.run
}
}
- def _stop(on_complete: Task) = Broker.BLOCKABLE_THREAD_POOL {
+ def _stop(on_complete: Task) = blockingExecutor {
this.synchronized {
if (server != null) {
try {
@@ -178,7 +182,7 @@ object WebSocketTransportFactory extends
if (service_state.is_started) {
transportServerListener.onAccept(transport)
} else {
- Broker.BLOCKABLE_THREAD_POOL {
+ blockingExecutor {
transport.connection.disconnect();
}
}
@@ -198,6 +202,9 @@ object WebSocketTransportFactory extends
// Transport interface methods.
/////////////////////////////////////////////////////////////////////////
+ @BeanProperty
+ var blockingExecutor:Executor = _
+
var dispatchQueue = createQueue()
def getDispatchQueue: DispatchQueue = dispatchQueue
@@ -357,7 +364,7 @@ object WebSocketTransportFactory extends
}
}
- Broker.BLOCKABLE_THREAD_POOL {
+ blockingExecutor {
inbound.synchronized {
inbound_capacity_remaining += rc
inbound.notify();
@@ -492,7 +499,7 @@ object WebSocketTransportFactory extends
var outbound_capacity_remaining = 1024 * 64;
- val outbound_executor = new SerialExecutor(Broker.BLOCKABLE_THREAD_POOL) {
+ val outbound_executor = new SerialExecutor(blockingExecutor) {
var outbound_drained = 0
override def drained = {
val amount = outbound_drained