You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 05:36:20 UTC

svn commit: r1238934 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activem...

Author: chirino
Date: Wed Feb  1 04:36:19 2012
New Revision: 1238934

URL: http://svn.apache.org/viewvc?rev=1238934&view=rev
Log:
Fixes APLO-145 : Support WebSockets

This adds support for the ws:// and was:// transports.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index Wed Feb  1 04:36:19 2012
@@ -17,3 +17,4 @@
 org.apache.activemq.apollo.broker.transport.VMTransportFactory
 org.apache.activemq.apollo.broker.transport.TcpTransportFactory
 org.apache.activemq.apollo.broker.transport.SslTransportFactory
+org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala Wed Feb  1 04:36:19 2012
@@ -0,0 +1,13 @@
+package org.apache.activemq.apollo.broker
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerAware {
+
+  def set_broker(value:Broker):Unit
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Feb  1 04:36:19 2012
@@ -200,6 +200,8 @@ class AcceptingConnector(val broker:Brok
     transport_server.setTransportServerListener(BrokerAcceptListener)
 
     transport_server match {
+      case transport_server:BrokerAware =>
+        transport_server.set_broker(broker)
       case transport_server:SslTransportServer =>
         transport_server.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL);
         if( broker.key_storage!=null ) {

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb  1 04:36:19 2012
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.jetty
+
+import org.fusesource.hawtdispatch._
+import org.fusesource.hawtdispatch.transport._
+import org.apache.activemq.apollo.broker.{Broker, BrokerAware}
+import org.apache.activemq.apollo.broker.transport.TransportFactory
+import org.apache.activemq.apollo.util._
+import org.eclipse.jetty.server.nio.SelectChannelConnector
+import javax.net.ssl.SSLContext
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
+import org.eclipse.jetty.util.thread.ExecutorThreadPool
+import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.websocket.{WebSocket, WebSocketServlet}
+import org.eclipse.jetty.server.{Connector, Server}
+import java.net.{InetSocketAddress, URI}
+import java.lang.Class
+import scala.reflect.BeanProperty
+import java.nio.ByteBuffer
+import java.nio.channels._
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.Buffer
+import java.util.concurrent.ArrayBlockingQueue
+import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object WebSocketTransportFactory extends TransportFactory.Provider with Log {
+
+  
+  def connect(location: String): Transport = {
+    return null
+  }
+
+  def bind(location: String): TransportServer = {
+    var uri: URI = new URI(location)
+    uri.getScheme match {
+      case "ws" | "wss" => WsTransportServer(uri)
+      case _ => null
+    }
+  }
+
+  case class WsTransportServer(uri: URI) extends WebSocketServlet with BaseService with TransportServer with BrokerAware {
+
+    @BeanProperty
+    var dispatchQueue = createQueue()
+    @BeanProperty
+    var transportServerListener: TransportServerListener = _
+
+    var broker: Broker = _
+
+    def set_broker(value: Broker) = broker = value
+
+    var server: Server = _
+    var connector: Connector = _
+
+    def dispatch_queue = dispatchQueue
+
+    protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+      this.synchronized {
+
+        // var options: Map[String, String] = new HashMap[String, String](URISupport.parseParamters(bind_uri))
+        accept_dispatch_queue = dispatchQueue.createQueue("accept: " + uri);
+
+        val prefix = "/" + uri.getPath.stripPrefix("/")
+        val scheme = uri.getScheme
+        val host = uri.getHost
+        var port = uri.getPort
+
+        scheme match {
+          case "ws" =>
+            if (port == -1) {
+              port = 80
+            }
+          case "wss" =>
+            if (port == -1) {
+              port = 443
+            }
+          case _ => throw new Exception("Invalid bind protocol.")
+        }
+
+        connector = scheme match {
+          case "ws" => new SelectChannelConnector
+          case "wss" =>
+            val sslContext = if (broker.key_storage != null) {
+              val protocol = "TLS"
+              val sslContext = SSLContext.getInstance(protocol)
+              sslContext.init(broker.key_storage.create_key_managers, broker.key_storage.create_trust_managers, null)
+              sslContext
+            } else {
+              warn("You are using a transport that expects the broker's key storage to be configured.")
+              SSLContext.getDefault
+            }
+            val connector = new SslSelectChannelConnector
+            connector.setSslContext(sslContext)
+            connector.setWantClientAuth(true)
+            connector
+        }
+        connector.setHost(host)
+        connector.setPort(port)
+
+        var context = new ServletContextHandler(ServletContextHandler.NO_SECURITY)
+        context.setContextPath(prefix)
+        context.addServlet(new ServletHolder(this), "/")
+
+        server = new Server
+        server.setHandler(context)
+        server.setConnectors(Array(connector))
+        server.setThreadPool(new ExecutorThreadPool(Broker.BLOCKABLE_THREAD_POOL))
+        server.start
+
+        on_completed.run
+      }
+    }
+
+    def _stop(on_complete: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+      this.synchronized {
+        if (server != null) {
+          try {
+            server.stop
+          } catch {
+            case ignore =>
+          }
+          server = null
+        }
+        on_complete.run
+      }
+    }
+
+    def getBoundAddress = {
+      val prefix = "/" + uri.getPath.stripPrefix("/")
+      new URI(uri.getScheme + "://" + uri.getHost + ":" + connector.getLocalPort + prefix).toString
+    }
+
+    def getConnectAddress = getBoundAddress
+
+    def getSocketAddress = new InetSocketAddress(uri.getHost, connector.getLocalPort)
+
+    val pending_connects = new ArrayBlockingQueue[WebSocketTransport](100)
+    var accept_dispatch_queue = dispatchQueue
+
+    def resume() = accept_dispatch_queue.resume()
+
+    def suspend() = accept_dispatch_queue.suspend()
+
+    def fire_accept = accept_dispatch_queue {
+      val transport = pending_connects.poll()
+      if (transport != null) {
+        if (service_state.is_started) {
+          transportServerListener.onAccept(transport)
+        } else {
+          Broker.BLOCKABLE_THREAD_POOL {
+            transport.connection.disconnect();
+          }
+        }
+      }
+    }
+
+    def doWebSocketConnect(request: HttpServletRequest, protocol: String) = WebSocketTransport(this, request, protocol)
+  }
+
+  /**
+   *
+   */
+  case class WebSocketTransport(server: WsTransportServer, request: HttpServletRequest, protocol: String) 
+          extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with Transport with ScatteringByteChannel with GatheringByteChannel {
+
+    /////////////////////////////////////////////////////////////////////////
+    // Transport interface methods.
+    /////////////////////////////////////////////////////////////////////////
+    
+    @BeanProperty
+    var dispatchQueue = createQueue()
+
+    @BeanProperty
+    var transportListener: TransportListener = _
+
+    var protocolCodec: ProtocolCodec = _
+
+
+    def getProtocolCodec = protocolCodec
+
+    def setProtocolCodec(protocolCodec: ProtocolCodec) = {
+      this.protocolCodec = protocolCodec
+      if( this.protocolCodec!=null ) {
+        this.protocolCodec.setReadableByteChannel(this)
+        this.protocolCodec.setWritableByteChannel(this)
+      }
+    }
+
+    def dispatch_queue = dispatchQueue
+
+    protected def _start(on_completed: Runnable) = {
+      inbound_dispatch_queue = dispatchQueue.createQueue(null);
+      inbound_dispatch_queue.suspend();
+      drain_outbound_events.setTargetQueue(dispatchQueue)
+      transportListener.onTransportConnected();
+      on_completed.run()
+    }
+  
+  
+    protected def _stop(on_completed: Runnable) = {
+      inbound_dispatch_queue.resume()
+      on_completed.run()
+    }
+    
+    def getLocalAddress = new InetSocketAddress(request.getLocalAddr, request.getLocalPort)
+    def getRemoteAddress = new InetSocketAddress(request.getRemoteHost, request.getRemotePort)
+    def getTypeId = server.uri.getScheme
+
+    def isConnected = connection == null || connection.isOpen
+    def isDisposed = connection == null
+    def isFaultTolerant = false
+
+    def reconnect(p1: URI) = throw new UnsupportedOperationException()
+
+    def narrow[T](target: Class[T]): T = {
+      if (target.isAssignableFrom(getClass())) {
+        return target.cast(this);
+      }
+      return null;
+    }
+    
+    /////////////////////////////////////////////////////////////////////////
+    //
+    // WebSocket Lifecycle Callbacks...
+    //
+    /////////////////////////////////////////////////////////////////////////
+    var connection: WebSocket.Connection = null
+    var closed: Option[(Int, String)] = None
+
+    def onOpen(connection: WebSocket.Connection): Unit = {
+      this.connection = connection
+      server.pending_connects.put(this)
+      server.fire_accept
+    }
+
+    def onClose(closeCode: Int, message: String) = dispatchQueue {
+      closed = Some(closeCode, message)
+      inbound_dispatch_queue {
+        drain_inbound
+      }
+    }
+
+    /////////////////////////////////////////////////////////////////////////
+    //
+    // This section handles in the inbound flow of messages
+    //
+    /////////////////////////////////////////////////////////////////////////
+
+    def onMessage(str: String): Unit = {
+      // Convert string messages to bytes messages..  our codecs just work with bytes..
+      var data = str.getBytes("UTF-8")
+      onMessage(data, 0, data.length)
+    }
+
+    var inbound_capacity_remaining = 1024 * 64;
+    val inbound = ListBuffer[Buffer]()
+
+    var inbound_dispatch_queue = dispatchQueue
+
+    def resumeRead() = {
+      inbound_dispatch_queue.resume()
+      inbound_dispatch_queue {
+        drain_inbound
+      }
+    }
+
+    def suspendRead() = inbound_dispatch_queue.suspend()
+
+    def onMessage(data: Array[Byte], offset: Int, length: Int): Unit = {
+      inbound.synchronized {
+        // flow control check..
+        while (inbound_capacity_remaining <= 0 && service_state.is_started) {
+          inbound.wait();
+        }
+        inbound_capacity_remaining -= length;
+      }
+      inbound_dispatch_queue {
+        inbound += new Buffer(data, offset, length)
+        drain_inbound
+      }
+    }
+
+
+    def close() {}
+
+    def isOpen = inbound.isEmpty && closed != None
+
+    def read(dest: ByteBuffer): Int = {
+      dispatch_queue.assertExecuting()
+
+      if (inbound.isEmpty && closed != None) {
+        return -1
+      }
+
+      var rc = 0
+      while (dest.hasRemaining && !inbound.isEmpty) {
+        val src = inbound.head;
+        val len = src.length.min(dest.remaining())
+        rc += len
+        dest.put(src.data, src.offset, len)
+        src.moveHead(len)
+        if (src.length == 0) {
+          inbound.remove(0)
+        }
+      }
+
+      Broker.BLOCKABLE_THREAD_POOL {
+        inbound.synchronized {
+          inbound_capacity_remaining += rc
+          inbound.notify();
+        }
+      }
+      rc
+    }
+
+    def read(dsts: Array[ByteBuffer]): Long = read(dsts, 0, dsts.length)
+
+    def read(dsts: Array[ByteBuffer], offset: Int, length: Int): Long = {
+      if (offset + length > dsts.length || length < 0 || offset < 0) {
+        throw new IndexOutOfBoundsException
+      }
+      var rc = 0L
+      var i: Int = 0
+      while (i < length) {
+        var dst: ByteBuffer = dsts(offset + i)
+        if (dst.hasRemaining) {
+          rc += read(dst)
+        }
+        if (dst.hasRemaining) {
+          return rc
+        }
+        i += 1;
+        i
+      }
+      rc
+    }
+
+  
+    protected def drain_inbound: Unit = {
+      dispatch_queue.assertExecuting()
+      try {
+        //        var initial = protocolCodec.getReadCounter
+        //        while (codec.getReadCounter - initial < codec.getReadBufferSize << 2) {
+        while (true) {
+          if (!service_state.is_started || inbound_dispatch_queue.isSuspended) {
+            return
+          }
+          var command = protocolCodec.read
+          if (command != null) {
+            println(command)
+            try {
+              transportListener.onTransportCommand(command)
+            } catch {
+              case e: Throwable => {
+                transportListener.onTransportFailure(new IOException("Transport listener failure."))
+              }
+            }
+          } else {
+            return
+          }
+        }
+        //        yieldSource.merge(1)
+      } catch {
+        case e: IOException => transportListener.onTransportFailure(e)
+      }
+    }
+
+    /////////////////////////////////////////////////////////////////////////
+    //
+    // This section handles in the outbound flow of messages
+    //
+    /////////////////////////////////////////////////////////////////////////
+
+    def full() = protocolCodec == null || protocolCodec.full();
+
+    def offer(command: AnyRef): Boolean = {
+      dispatchQueue.assertExecuting
+      try {
+        if (!service_state.is_started) {
+          throw new IOException("Not running.")
+        }
+        protocolCodec.write(command) match {
+          case BufferState.FULL =>
+            return false
+          case _ =>
+            drain_outbound_events.merge(1)
+            return true
+        }
+      }
+      catch {
+        case e: IOException => {
+          transportListener.onTransportFailure(e)
+          return false
+        }
+      }
+    }
+
+    val drain_outbound_events = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+    drain_outbound_events.setEventHandler(^ { flush })
+    drain_outbound_events.resume
+
+    /**
+     *
+     */
+    def flush: Unit = {
+      dispatchQueue.assertExecuting
+      if (!service_state.is_started) {
+        return
+      }
+      try {
+        protocolCodec.flush
+      } catch {
+        case e: IOException => {
+          transportListener.onTransportFailure(e)
+        }
+      }
+    }
+    
+    def write(srcs: Array[ByteBuffer]): Long = write(srcs, 0, srcs.length)
+    def write(srcs: Array[ByteBuffer], offset: Int, length: Int): Long = {
+      if (offset + length > srcs.length || length < 0 || offset < 0) {
+        throw new IndexOutOfBoundsException
+      }
+      var rc: Long = 0
+      var i: Int = 0
+      while (i < length) {
+        var src: ByteBuffer = srcs(offset + i)
+        if (src.hasRemaining) {
+          rc += write(src)
+        }
+        if (src.hasRemaining) {
+          return rc
+        }
+        i += 1
+      }
+      rc
+    }
+
+    var outbound_capacity_remaining = 1024 * 64;
+    var outbound_drained = 0
+
+    val outbound_executor = new SerialExecutor(Broker.BLOCKABLE_THREAD_POOL) {
+      override def drained  = {
+        outbound_capacity_remaining += outbound_drained
+        outbound_drained = 0
+      }
+    }
+
+    def write(buf: ByteBuffer) = {
+      dispatchQueue.assertExecuting
+      var remaining = buf.remaining()
+      if( remaining > 0 ) {
+        if (outbound_capacity_remaining <= 0) {
+          outbound_capacity_remaining -= remaining;
+        }
+        var dup = buf.duplicate()
+        outbound_executor {
+          println("Sending: "+ new Buffer(dup.array(), dup.arrayOffset(), dup.remaining()))
+          connection.sendMessage(dup.array(), dup.arrayOffset(), dup.remaining())
+          outbound_drained += remaining
+        }
+        buf.position(buf.position()+ remaining);
+      }
+      remaining;
+    }
+
+  }
+
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala Wed Feb  1 04:36:19 2012
@@ -0,0 +1,59 @@
+package org.apache.activemq.apollo.util
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
+
+/**
+ * <p>Provides serial execution of runnable tasks on any executor.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class SerialExecutor(target: Executor) extends Executor {
+
+  private final val triggered: AtomicBoolean = new AtomicBoolean
+  private final val queue = new ConcurrentLinkedQueue[Runnable]()
+
+  final def execute(action: Runnable) {
+    queue.add(action)
+    if (triggered.compareAndSet(false, true)) {
+      target.execute(self);
+    }
+  }
+
+  private final object self extends Runnable {
+    def run = drain
+  }
+
+  private final def drain: Unit = {
+    while (true) {
+      try {
+        var action = queue.poll
+        while (action != null) {
+          try {
+            action.run
+          } catch {
+            case e: Throwable =>
+              var thread: Thread = Thread.currentThread
+              thread.getUncaughtExceptionHandler.uncaughtException(thread, e)
+          }
+          action = queue.poll
+        }
+      } finally {
+        drained
+        triggered.set(false)
+        if (queue.isEmpty || !triggered.compareAndSet(false, true)) {
+          return
+        }
+      }
+    }
+  }
+
+  /**
+   * Subclasses can override this method so that it
+   * can perform some work once the execution queue
+   * is drained.
+   */
+  protected def drained: Unit = {
+  }
+
+}

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Feb  1 04:36:19 2012
@@ -70,7 +70,7 @@
     <howl-version>0.1.8</howl-version>
     <hsqldb-version>1.7.2.2</hsqldb-version>
     <jdom-version>1.0</jdom-version>
-    <jetty-version>7.1.6.v20100715</jetty-version>
+    <jetty-version>7.5.1.v20110908</jetty-version>
     <jmock-version>1.0.1</jmock-version>
     <junit-version>4.7</junit-version>
     <jxta-version>2.0</jxta-version>