You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 11:15:22 UTC

svn commit: r1239037 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala

Author: chirino
Date: Wed Feb  1 10:15:21 2012
New Revision: 1239037

URL: http://svn.apache.org/viewvc?rev=1239037&view=rev
Log:
WebSocket: Seems most browsers don't support binary transfers yet.  Default to doing text based transfers.  Better shutdown handling.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1239037&r1=1239036&r2=1239037&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb  1 10:15:21 2012
@@ -34,11 +34,11 @@ import java.lang.Class
 import scala.reflect.BeanProperty
 import java.nio.ByteBuffer
 import java.nio.channels._
-import java.io.IOException
 import scala.collection.mutable.ListBuffer
-import org.fusesource.hawtbuf.Buffer
 import java.util.concurrent.ArrayBlockingQueue
 import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import java.io.{EOFException, IOException}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -64,6 +64,8 @@ object WebSocketTransportFactory extends
     var dispatchQueue = createQueue()
     @BeanProperty
     var transportServerListener: TransportServerListener = _
+    @BeanProperty
+    var binary_transfers = false
 
     var broker: Broker = _
 
@@ -77,7 +79,8 @@ object WebSocketTransportFactory extends
     protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
       this.synchronized {
 
-        // var options: Map[String, String] = new HashMap[String, String](URISupport.parseParamters(bind_uri))
+        IntrospectionSupport.setProperties(this, URISupport.parseParamters(uri));
+
         accept_dispatch_queue = dispatchQueue.createQueue("accept: " + uri);
 
         val prefix = "/" + uri.getPath.stripPrefix("/")
@@ -195,6 +198,9 @@ object WebSocketTransportFactory extends
 
     var protocolCodec: ProtocolCodec = _
 
+    // Seems most browsers don't support binary transfers yet, so only enable it if
+    // the client is requesting them or the transport server was configured to use them.
+    var binary_transfers = Option(request.getHeader("binary_transfers")).map(_=="true").getOrElse(server.binary_transfers)
 
     def getProtocolCodec = protocolCodec
 
@@ -216,9 +222,15 @@ object WebSocketTransportFactory extends
       on_completed.run()
     }
   
-  
     protected def _stop(on_completed: Runnable) = {
       inbound_dispatch_queue.resume()
+      outbound_executor {
+        // Wakes up any blocked reader thread..
+        inbound.synchronized {
+          inbound.notify();
+        }
+        connection.disconnect()
+      }
       on_completed.run()
     }
     
@@ -268,8 +280,8 @@ object WebSocketTransportFactory extends
 
     def onMessage(str: String): Unit = {
       // Convert string messages to bytes messages..  our codecs just work with bytes..
-      var data = str.getBytes("UTF-8")
-      onMessage(data, 0, data.length)
+      var buffer = new AsciiBuffer(str)
+      onMessage(buffer.data, buffer.offset, buffer.length)
     }
 
     var inbound_capacity_remaining = 1024 * 64;
@@ -473,10 +485,14 @@ object WebSocketTransportFactory extends
         if (outbound_capacity_remaining <= 0) {
           outbound_capacity_remaining -= remaining;
         }
-        var dup = buf.duplicate()
+
+        var buffer = new Buffer(buf.array(), buf.arrayOffset(), buf.remaining())
         outbound_executor {
-          println("Sending: "+ new Buffer(dup.array(), dup.arrayOffset(), dup.remaining()))
-          connection.sendMessage(dup.array(), dup.arrayOffset(), dup.remaining())
+          if( !binary_transfers ) {
+            connection.sendMessage(buffer.ascii().toString)
+          } else {
+            connection.sendMessage(buffer.data, buffer.offset, buffer.length)
+          }
           outbound_drained += remaining
         }
         buf.position(buf.position()+ remaining);