You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 11:15:22 UTC
svn commit: r1239037 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Author: chirino
Date: Wed Feb 1 10:15:21 2012
New Revision: 1239037
URL: http://svn.apache.org/viewvc?rev=1239037&view=rev
Log:
WebSocket: Seems most browsers don't support binary transfers yet. Default to doing text based transfers. Better shutdown handling.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1239037&r1=1239036&r2=1239037&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb 1 10:15:21 2012
@@ -34,11 +34,11 @@ import java.lang.Class
import scala.reflect.BeanProperty
import java.nio.ByteBuffer
import java.nio.channels._
-import java.io.IOException
import scala.collection.mutable.ListBuffer
-import org.fusesource.hawtbuf.Buffer
import java.util.concurrent.ArrayBlockingQueue
import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import java.io.{EOFException, IOException}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -64,6 +64,8 @@ object WebSocketTransportFactory extends
var dispatchQueue = createQueue()
@BeanProperty
var transportServerListener: TransportServerListener = _
+ @BeanProperty
+ var binary_transfers = false
var broker: Broker = _
@@ -77,7 +79,8 @@ object WebSocketTransportFactory extends
protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
this.synchronized {
- // var options: Map[String, String] = new HashMap[String, String](URISupport.parseParamters(bind_uri))
+ IntrospectionSupport.setProperties(this, URISupport.parseParamters(uri));
+
accept_dispatch_queue = dispatchQueue.createQueue("accept: " + uri);
val prefix = "/" + uri.getPath.stripPrefix("/")
@@ -195,6 +198,9 @@ object WebSocketTransportFactory extends
var protocolCodec: ProtocolCodec = _
+ // Seems most browsers don't support binary transfers yet, so only enable it if
+ // the client is requesting them or the transport server was configured to use them.
+ var binary_transfers = Option(request.getHeader("binary_transfers")).map(_=="true").getOrElse(server.binary_transfers)
def getProtocolCodec = protocolCodec
@@ -216,9 +222,15 @@ object WebSocketTransportFactory extends
on_completed.run()
}
-
protected def _stop(on_completed: Runnable) = {
inbound_dispatch_queue.resume()
+ outbound_executor {
+ // Wakes up any blocked reader thread..
+ inbound.synchronized {
+ inbound.notify();
+ }
+ connection.disconnect()
+ }
on_completed.run()
}
@@ -268,8 +280,8 @@ object WebSocketTransportFactory extends
def onMessage(str: String): Unit = {
// Convert string messages to bytes messages.. our codecs just work with bytes..
- var data = str.getBytes("UTF-8")
- onMessage(data, 0, data.length)
+ var buffer = new AsciiBuffer(str)
+ onMessage(buffer.data, buffer.offset, buffer.length)
}
var inbound_capacity_remaining = 1024 * 64;
@@ -473,10 +485,14 @@ object WebSocketTransportFactory extends
if (outbound_capacity_remaining <= 0) {
outbound_capacity_remaining -= remaining;
}
- var dup = buf.duplicate()
+
+ var buffer = new Buffer(buf.array(), buf.arrayOffset(), buf.remaining())
outbound_executor {
- println("Sending: "+ new Buffer(dup.array(), dup.arrayOffset(), dup.remaining()))
- connection.sendMessage(dup.array(), dup.arrayOffset(), dup.remaining())
+ if( !binary_transfers ) {
+ connection.sendMessage(buffer.ascii().toString)
+ } else {
+ connection.sendMessage(buffer.data, buffer.offset, buffer.length)
+ }
outbound_drained += remaining
}
buf.position(buf.position()+ remaining);