You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/01 05:36:20 UTC
svn commit: r1238934 - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activem...
Author: chirino
Date: Wed Feb 1 04:36:19 2012
New Revision: 1238934
URL: http://svn.apache.org/viewvc?rev=1238934&view=rev
Log:
Fixes APLO-145 : Support WebSockets
This adds support for the ws:// and was:// transports.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index Wed Feb 1 04:36:19 2012
@@ -17,3 +17,4 @@
org.apache.activemq.apollo.broker.transport.VMTransportFactory
org.apache.activemq.apollo.broker.transport.TcpTransportFactory
org.apache.activemq.apollo.broker.transport.SslTransportFactory
+org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerAware.scala Wed Feb 1 04:36:19 2012
@@ -0,0 +1,13 @@
+package org.apache.activemq.apollo.broker
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerAware {
+
+ def set_broker(value:Broker):Unit
+
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed Feb 1 04:36:19 2012
@@ -200,6 +200,8 @@ class AcceptingConnector(val broker:Brok
transport_server.setTransportServerListener(BrokerAcceptListener)
transport_server match {
+ case transport_server:BrokerAware =>
+ transport_server.set_broker(broker)
case transport_server:SslTransportServer =>
transport_server.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL);
if( broker.key_storage!=null ) {
Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala Wed Feb 1 04:36:19 2012
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.jetty
+
+import org.fusesource.hawtdispatch._
+import org.fusesource.hawtdispatch.transport._
+import org.apache.activemq.apollo.broker.{Broker, BrokerAware}
+import org.apache.activemq.apollo.broker.transport.TransportFactory
+import org.apache.activemq.apollo.util._
+import org.eclipse.jetty.server.nio.SelectChannelConnector
+import javax.net.ssl.SSLContext
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector
+import org.eclipse.jetty.util.thread.ExecutorThreadPool
+import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.websocket.{WebSocket, WebSocketServlet}
+import org.eclipse.jetty.server.{Connector, Server}
+import java.net.{InetSocketAddress, URI}
+import java.lang.Class
+import scala.reflect.BeanProperty
+import java.nio.ByteBuffer
+import java.nio.channels._
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.Buffer
+import java.util.concurrent.ArrayBlockingQueue
+import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object WebSocketTransportFactory extends TransportFactory.Provider with Log {
+
+
+ def connect(location: String): Transport = {
+ return null
+ }
+
+ def bind(location: String): TransportServer = {
+ var uri: URI = new URI(location)
+ uri.getScheme match {
+ case "ws" | "wss" => WsTransportServer(uri)
+ case _ => null
+ }
+ }
+
+ case class WsTransportServer(uri: URI) extends WebSocketServlet with BaseService with TransportServer with BrokerAware {
+
+ @BeanProperty
+ var dispatchQueue = createQueue()
+ @BeanProperty
+ var transportServerListener: TransportServerListener = _
+
+ var broker: Broker = _
+
+ def set_broker(value: Broker) = broker = value
+
+ var server: Server = _
+ var connector: Connector = _
+
+ def dispatch_queue = dispatchQueue
+
+ protected def _start(on_completed: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+ this.synchronized {
+
+ // var options: Map[String, String] = new HashMap[String, String](URISupport.parseParamters(bind_uri))
+ accept_dispatch_queue = dispatchQueue.createQueue("accept: " + uri);
+
+ val prefix = "/" + uri.getPath.stripPrefix("/")
+ val scheme = uri.getScheme
+ val host = uri.getHost
+ var port = uri.getPort
+
+ scheme match {
+ case "ws" =>
+ if (port == -1) {
+ port = 80
+ }
+ case "wss" =>
+ if (port == -1) {
+ port = 443
+ }
+ case _ => throw new Exception("Invalid bind protocol.")
+ }
+
+ connector = scheme match {
+ case "ws" => new SelectChannelConnector
+ case "wss" =>
+ val sslContext = if (broker.key_storage != null) {
+ val protocol = "TLS"
+ val sslContext = SSLContext.getInstance(protocol)
+ sslContext.init(broker.key_storage.create_key_managers, broker.key_storage.create_trust_managers, null)
+ sslContext
+ } else {
+ warn("You are using a transport that expects the broker's key storage to be configured.")
+ SSLContext.getDefault
+ }
+ val connector = new SslSelectChannelConnector
+ connector.setSslContext(sslContext)
+ connector.setWantClientAuth(true)
+ connector
+ }
+ connector.setHost(host)
+ connector.setPort(port)
+
+ var context = new ServletContextHandler(ServletContextHandler.NO_SECURITY)
+ context.setContextPath(prefix)
+ context.addServlet(new ServletHolder(this), "/")
+
+ server = new Server
+ server.setHandler(context)
+ server.setConnectors(Array(connector))
+ server.setThreadPool(new ExecutorThreadPool(Broker.BLOCKABLE_THREAD_POOL))
+ server.start
+
+ on_completed.run
+ }
+ }
+
+ def _stop(on_complete: Runnable) = Broker.BLOCKABLE_THREAD_POOL {
+ this.synchronized {
+ if (server != null) {
+ try {
+ server.stop
+ } catch {
+ case ignore =>
+ }
+ server = null
+ }
+ on_complete.run
+ }
+ }
+
+ def getBoundAddress = {
+ val prefix = "/" + uri.getPath.stripPrefix("/")
+ new URI(uri.getScheme + "://" + uri.getHost + ":" + connector.getLocalPort + prefix).toString
+ }
+
+ def getConnectAddress = getBoundAddress
+
+ def getSocketAddress = new InetSocketAddress(uri.getHost, connector.getLocalPort)
+
+ val pending_connects = new ArrayBlockingQueue[WebSocketTransport](100)
+ var accept_dispatch_queue = dispatchQueue
+
+ def resume() = accept_dispatch_queue.resume()
+
+ def suspend() = accept_dispatch_queue.suspend()
+
+ def fire_accept = accept_dispatch_queue {
+ val transport = pending_connects.poll()
+ if (transport != null) {
+ if (service_state.is_started) {
+ transportServerListener.onAccept(transport)
+ } else {
+ Broker.BLOCKABLE_THREAD_POOL {
+ transport.connection.disconnect();
+ }
+ }
+ }
+ }
+
+ def doWebSocketConnect(request: HttpServletRequest, protocol: String) = WebSocketTransport(this, request, protocol)
+ }
+
+ /**
+ *
+ */
+ case class WebSocketTransport(server: WsTransportServer, request: HttpServletRequest, protocol: String)
+ extends BaseService with WebSocket.OnTextMessage with WebSocket.OnBinaryMessage with Transport with ScatteringByteChannel with GatheringByteChannel {
+
+ /////////////////////////////////////////////////////////////////////////
+ // Transport interface methods.
+ /////////////////////////////////////////////////////////////////////////
+
+ @BeanProperty
+ var dispatchQueue = createQueue()
+
+ @BeanProperty
+ var transportListener: TransportListener = _
+
+ var protocolCodec: ProtocolCodec = _
+
+
+ def getProtocolCodec = protocolCodec
+
+ def setProtocolCodec(protocolCodec: ProtocolCodec) = {
+ this.protocolCodec = protocolCodec
+ if( this.protocolCodec!=null ) {
+ this.protocolCodec.setReadableByteChannel(this)
+ this.protocolCodec.setWritableByteChannel(this)
+ }
+ }
+
+ def dispatch_queue = dispatchQueue
+
+ protected def _start(on_completed: Runnable) = {
+ inbound_dispatch_queue = dispatchQueue.createQueue(null);
+ inbound_dispatch_queue.suspend();
+ drain_outbound_events.setTargetQueue(dispatchQueue)
+ transportListener.onTransportConnected();
+ on_completed.run()
+ }
+
+
+ protected def _stop(on_completed: Runnable) = {
+ inbound_dispatch_queue.resume()
+ on_completed.run()
+ }
+
+ def getLocalAddress = new InetSocketAddress(request.getLocalAddr, request.getLocalPort)
+ def getRemoteAddress = new InetSocketAddress(request.getRemoteHost, request.getRemotePort)
+ def getTypeId = server.uri.getScheme
+
+ def isConnected = connection == null || connection.isOpen
+ def isDisposed = connection == null
+ def isFaultTolerant = false
+
+ def reconnect(p1: URI) = throw new UnsupportedOperationException()
+
+ def narrow[T](target: Class[T]): T = {
+ if (target.isAssignableFrom(getClass())) {
+ return target.cast(this);
+ }
+ return null;
+ }
+
+ /////////////////////////////////////////////////////////////////////////
+ //
+ // WebSocket Lifecycle Callbacks...
+ //
+ /////////////////////////////////////////////////////////////////////////
+ var connection: WebSocket.Connection = null
+ var closed: Option[(Int, String)] = None
+
+ def onOpen(connection: WebSocket.Connection): Unit = {
+ this.connection = connection
+ server.pending_connects.put(this)
+ server.fire_accept
+ }
+
+ def onClose(closeCode: Int, message: String) = dispatchQueue {
+ closed = Some(closeCode, message)
+ inbound_dispatch_queue {
+ drain_inbound
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////
+ //
+ // This section handles in the inbound flow of messages
+ //
+ /////////////////////////////////////////////////////////////////////////
+
+ def onMessage(str: String): Unit = {
+ // Convert string messages to bytes messages.. our codecs just work with bytes..
+ var data = str.getBytes("UTF-8")
+ onMessage(data, 0, data.length)
+ }
+
+ var inbound_capacity_remaining = 1024 * 64;
+ val inbound = ListBuffer[Buffer]()
+
+ var inbound_dispatch_queue = dispatchQueue
+
+ def resumeRead() = {
+ inbound_dispatch_queue.resume()
+ inbound_dispatch_queue {
+ drain_inbound
+ }
+ }
+
+ def suspendRead() = inbound_dispatch_queue.suspend()
+
+ def onMessage(data: Array[Byte], offset: Int, length: Int): Unit = {
+ inbound.synchronized {
+ // flow control check..
+ while (inbound_capacity_remaining <= 0 && service_state.is_started) {
+ inbound.wait();
+ }
+ inbound_capacity_remaining -= length;
+ }
+ inbound_dispatch_queue {
+ inbound += new Buffer(data, offset, length)
+ drain_inbound
+ }
+ }
+
+
+ def close() {}
+
+ def isOpen = inbound.isEmpty && closed != None
+
+ def read(dest: ByteBuffer): Int = {
+ dispatch_queue.assertExecuting()
+
+ if (inbound.isEmpty && closed != None) {
+ return -1
+ }
+
+ var rc = 0
+ while (dest.hasRemaining && !inbound.isEmpty) {
+ val src = inbound.head;
+ val len = src.length.min(dest.remaining())
+ rc += len
+ dest.put(src.data, src.offset, len)
+ src.moveHead(len)
+ if (src.length == 0) {
+ inbound.remove(0)
+ }
+ }
+
+ Broker.BLOCKABLE_THREAD_POOL {
+ inbound.synchronized {
+ inbound_capacity_remaining += rc
+ inbound.notify();
+ }
+ }
+ rc
+ }
+
+ def read(dsts: Array[ByteBuffer]): Long = read(dsts, 0, dsts.length)
+
+ def read(dsts: Array[ByteBuffer], offset: Int, length: Int): Long = {
+ if (offset + length > dsts.length || length < 0 || offset < 0) {
+ throw new IndexOutOfBoundsException
+ }
+ var rc = 0L
+ var i: Int = 0
+ while (i < length) {
+ var dst: ByteBuffer = dsts(offset + i)
+ if (dst.hasRemaining) {
+ rc += read(dst)
+ }
+ if (dst.hasRemaining) {
+ return rc
+ }
+ i += 1;
+ i
+ }
+ rc
+ }
+
+
+ protected def drain_inbound: Unit = {
+ dispatch_queue.assertExecuting()
+ try {
+ // var initial = protocolCodec.getReadCounter
+ // while (codec.getReadCounter - initial < codec.getReadBufferSize << 2) {
+ while (true) {
+ if (!service_state.is_started || inbound_dispatch_queue.isSuspended) {
+ return
+ }
+ var command = protocolCodec.read
+ if (command != null) {
+ println(command)
+ try {
+ transportListener.onTransportCommand(command)
+ } catch {
+ case e: Throwable => {
+ transportListener.onTransportFailure(new IOException("Transport listener failure."))
+ }
+ }
+ } else {
+ return
+ }
+ }
+ // yieldSource.merge(1)
+ } catch {
+ case e: IOException => transportListener.onTransportFailure(e)
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////
+ //
+ // This section handles in the outbound flow of messages
+ //
+ /////////////////////////////////////////////////////////////////////////
+
+ def full() = protocolCodec == null || protocolCodec.full();
+
+ def offer(command: AnyRef): Boolean = {
+ dispatchQueue.assertExecuting
+ try {
+ if (!service_state.is_started) {
+ throw new IOException("Not running.")
+ }
+ protocolCodec.write(command) match {
+ case BufferState.FULL =>
+ return false
+ case _ =>
+ drain_outbound_events.merge(1)
+ return true
+ }
+ }
+ catch {
+ case e: IOException => {
+ transportListener.onTransportFailure(e)
+ return false
+ }
+ }
+ }
+
+ val drain_outbound_events = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+ drain_outbound_events.setEventHandler(^ { flush })
+ drain_outbound_events.resume
+
+ /**
+ *
+ */
+ def flush: Unit = {
+ dispatchQueue.assertExecuting
+ if (!service_state.is_started) {
+ return
+ }
+ try {
+ protocolCodec.flush
+ } catch {
+ case e: IOException => {
+ transportListener.onTransportFailure(e)
+ }
+ }
+ }
+
+ def write(srcs: Array[ByteBuffer]): Long = write(srcs, 0, srcs.length)
+ def write(srcs: Array[ByteBuffer], offset: Int, length: Int): Long = {
+ if (offset + length > srcs.length || length < 0 || offset < 0) {
+ throw new IndexOutOfBoundsException
+ }
+ var rc: Long = 0
+ var i: Int = 0
+ while (i < length) {
+ var src: ByteBuffer = srcs(offset + i)
+ if (src.hasRemaining) {
+ rc += write(src)
+ }
+ if (src.hasRemaining) {
+ return rc
+ }
+ i += 1
+ }
+ rc
+ }
+
+ var outbound_capacity_remaining = 1024 * 64;
+ var outbound_drained = 0
+
+ val outbound_executor = new SerialExecutor(Broker.BLOCKABLE_THREAD_POOL) {
+ override def drained = {
+ outbound_capacity_remaining += outbound_drained
+ outbound_drained = 0
+ }
+ }
+
+ def write(buf: ByteBuffer) = {
+ dispatchQueue.assertExecuting
+ var remaining = buf.remaining()
+ if( remaining > 0 ) {
+ if (outbound_capacity_remaining <= 0) {
+ outbound_capacity_remaining -= remaining;
+ }
+ var dup = buf.duplicate()
+ outbound_executor {
+ println("Sending: "+ new Buffer(dup.array(), dup.arrayOffset(), dup.remaining()))
+ connection.sendMessage(dup.array(), dup.arrayOffset(), dup.remaining())
+ outbound_drained += remaining
+ }
+ buf.position(buf.position()+ remaining);
+ }
+ remaining;
+ }
+
+ }
+
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala?rev=1238934&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/SerialExecutor.scala Wed Feb 1 04:36:19 2012
@@ -0,0 +1,59 @@
+package org.apache.activemq.apollo.util
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
+
+/**
+ * <p>Provides serial execution of runnable tasks on any executor.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class SerialExecutor(target: Executor) extends Executor {
+
+ private final val triggered: AtomicBoolean = new AtomicBoolean
+ private final val queue = new ConcurrentLinkedQueue[Runnable]()
+
+ final def execute(action: Runnable) {
+ queue.add(action)
+ if (triggered.compareAndSet(false, true)) {
+ target.execute(self);
+ }
+ }
+
+ private final object self extends Runnable {
+ def run = drain
+ }
+
+ private final def drain: Unit = {
+ while (true) {
+ try {
+ var action = queue.poll
+ while (action != null) {
+ try {
+ action.run
+ } catch {
+ case e: Throwable =>
+ var thread: Thread = Thread.currentThread
+ thread.getUncaughtExceptionHandler.uncaughtException(thread, e)
+ }
+ action = queue.poll
+ }
+ } finally {
+ drained
+ triggered.set(false)
+ if (queue.isEmpty || !triggered.compareAndSet(false, true)) {
+ return
+ }
+ }
+ }
+ }
+
+ /**
+ * Subclasses can override this method so that it
+ * can perform some work once the execution queue
+ * is drained.
+ */
+ protected def drained: Unit = {
+ }
+
+}
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1238934&r1=1238933&r2=1238934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Wed Feb 1 04:36:19 2012
@@ -70,7 +70,7 @@
<howl-version>0.1.8</howl-version>
<hsqldb-version>1.7.2.2</hsqldb-version>
<jdom-version>1.0</jdom-version>
- <jetty-version>7.1.6.v20100715</jetty-version>
+ <jetty-version>7.5.1.v20110908</jetty-version>
<jmock-version>1.0.1</jmock-version>
<junit-version>4.7</junit-version>
<jxta-version>2.0</jxta-version>