You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/14 22:30:01 UTC
svn commit: r1146886 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...
Author: chirino
Date: Thu Jul 14 20:29:59 2011
New Revision: 1146886
URL: http://svn.apache.org/viewvc?rev=1146886&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-34 : Allow a STOMP subscription to control message flow using a credit window
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jul 14 20:29:59 2011
@@ -74,7 +74,7 @@ class Queue(val router: LocalRouter, val
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
- val session_manager = new SinkMux[Delivery](messages, dispatch_queue, Delivery)
+ val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery)
// sequence numbers.. used to track what's in the store.
var message_seq_counter = 1L
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Jul 14 20:29:59 2011
@@ -18,9 +18,9 @@ package org.apache.activemq.apollo.broke
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtdispatch._
-import java.util.{LinkedList}
+import java.util.LinkedList
import org.apache.activemq.apollo.transport.Transport
-import collection.mutable.{HashSet, ListBuffer}
+import collection.mutable.HashSet
/**
* <p>
@@ -52,29 +52,31 @@ trait Sink[T] {
def refiller:Runnable
def refiller_=(value:Runnable)
- def map[Y](func: Y=>T ):Sink[Y] = {
- def outer = Sink.this
- new Sink[Y]() with SinkFilter {
- def downstream = outer
- def offer(value:Y) = {
- if( full ) {
- false
- } else {
- outer.offer(func(value))
- }
- }
- }
+ def map[Y](func: Y=>T ):Sink[Y] = new SinkMapper[Y,T] {
+ def passing(value: Y) = func(value)
+ def downstream = Sink.this
}
}
-trait SinkFilter {
- def downstream:Sink[_]
+trait SinkFilter[T] {
+ def downstream:Sink[T]
def refiller:Runnable = downstream.refiller
def refiller_=(value:Runnable) { downstream.refiller=value }
def full: Boolean = downstream.full
}
+trait SinkMapper[T,X] extends Sink[T] with SinkFilter[X] {
+ def offer(value:T) = {
+ if( full ) {
+ false
+ } else {
+ downstream.offer(passing(value))
+ }
+ }
+ def passing(value:T):X
+}
+
/**
* <p>
* A delivery sink which is connected to a transport. It expects the caller's dispatch
@@ -179,11 +181,58 @@ class MutableSink[T] extends Sink[T] {
}
+class SinkMux[T](val downstream:Sink[T]) {
+ var sinks = HashSet[Sink[T]]()
+
+ downstream.refiller = ^{
+ sinks.foreach { sink =>
+ sink.refiller.run()
+ }
+ }
+
+ def open():Sink[T] = {
+ val sink = new Sink[T] {
+ var refiller:Runnable = NOOP
+ def offer(value: T) = downstream.offer(value)
+ def full = downstream.full
+ }
+ sinks += sink
+ sink
+ }
+
+ def close(sink:Sink[T]):Unit = {
+ sinks -= sink
+ }
+}
+
+class CreditWindowFilter[T](val downstream:Sink[T], val sizer:Sizer[T]) extends SinkMapper[T,T] {
+
+ var byte_credits = 0
+ var delivery_credits = 0
+
+ override def full: Boolean = downstream.full || ( byte_credits <= 0 && delivery_credits <= 0 )
+
+ def passing(value: T) = {
+ byte_credits -= sizer.size(value)
+ delivery_credits -= 1
+ value
+ }
+
+ def credit(byte_credits:Int, delivery_credits:Int) = {
+ val was_full = full
+ this.byte_credits += byte_credits
+ this.delivery_credits += delivery_credits
+ if( was_full && !full ) {
+ refiller.run()
+ }
+ }
+}
+
trait SessionSink[T] extends Sink[T] {
def remaining_capacity:Int
}
-object SinkMux {
+object SessionSinkMux {
val default_session_max_credits = System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
}
@@ -198,7 +247,7 @@ object SinkMux {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class SinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
+class SessionSinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
var sessions = HashSet[Session[T]]()
@@ -237,7 +286,7 @@ class SinkMux[T](val downstream:Sink[T],
sessions.foreach(_.credit_adder.resume)
}
- def open(producer_queue:DispatchQueue, credits:Int=SinkMux.default_session_max_credits):SessionSink[T] = {
+ def open(producer_queue:DispatchQueue, credits:Int=SessionSinkMux.default_session_max_credits):SessionSink[T] = {
val session = new Session[T](producer_queue, 0, this)
consumer_queue <<| ^{
if( overflow.full ) {
@@ -266,7 +315,7 @@ class SinkMux[T](val downstream:Sink[T],
/**
* tracks one producer to consumer session / credit window.
*/
-class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends SessionSink[T] {
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SessionSinkMux[T]) extends SessionSink[T] {
var refiller:Runnable = NOOP
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jul 14 20:29:59 2011
@@ -73,7 +73,7 @@ class OpenwireProtocolHandler extends Pr
def protocol = PROTOCOL
- var outbound_sessions: SinkMux[Command] = null
+ var outbound_sessions: SessionSinkMux[Command] = null
var connection_session: Sink[Command] = null
var closed = false
@@ -176,7 +176,7 @@ class OpenwireProtocolHandler extends Pr
override def on_transport_connected():Unit = {
security_context.local_address = connection.transport.getLocalAddress
security_context.remote_address = connection.transport.getRemoteAddress
- outbound_sessions = new SinkMux[Command](connection.transport_sink.map {
+ outbound_sessions = new SessionSinkMux[Command](connection.transport_sink.map {
x:Command =>
x.setCommandId(next_command_id)
debug("sending openwire command: %s", x.toString())
@@ -785,7 +785,7 @@ class OpenwireProtocolHandler extends Pr
}
}
- def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter {
+ def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Command] {
retain
def producer = p
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Thu Jul 14 20:29:59 2011
@@ -384,6 +384,7 @@ object Stomp {
val ACK_MODE = ascii("ack")
val ID = ascii("id")
val SELECTOR = ascii("selector")
+ val CREDIT = ascii("credit")
val LOGIN = ascii("login")
val PASSCODE = ascii("passcode")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jul 14 20:29:59 2011
@@ -39,7 +39,6 @@ import org.apache.activemq.apollo.transp
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
-import javax.management.remote.rmi._RMIConnection_Stub
case class RichBuffer(self:Buffer) extends Proxy {
@@ -128,132 +127,213 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
- trait AckHandler {
- def track(delivery:Delivery):Unit
- def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
- }
+ class StompConsumer(
- class AutoAckHandler extends AckHandler {
- def track(delivery:Delivery) = {
- if( delivery.ack!=null ) {
- delivery.ack(Delivered, null)
- }
- }
+ val subscription_id:Option[AsciiBuffer],
+ val destination:Array[DestinationDTO],
+ ack_mode:AsciiBuffer,
+ val selector:(String, BooleanExpression),
+ override val browser:Boolean,
+ override val exclusive:Boolean,
+ val auto_delete:Boolean,
+ val initial_credit_window:(Int,Int, Boolean)
+ ) extends BaseRetained with DeliveryConsumer {
+
+//// The following comes in handy if we need to debug the
+//// reference counts of the consumers.
+//
+// val r = new BaseRetained
+//
+// def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+// def retained: Int =r.retained
+//
+// def printST(name:String) = {
+// val e = new Exception
+// println(name+": ")
+// println(" "+e.getStackTrace.drop(1).take(4).mkString("\n "))
+// }
+//
+// def retain: Unit = {
+// printST("retain")
+// r.retain
+// }
+// def release: Unit = {
+// printST("release")
+// r.release
+// }
- def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
- async_die("The subscription ack mode does not expect ACK or NACK frames")
+ trait AckHandler {
+ def track(delivery:Delivery):Unit
+ def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
}
- }
+ class AutoAckHandler extends AckHandler {
+ def track(delivery:Delivery) = {
+ if( delivery.ack!=null ) {
+ delivery.ack(Delivered, null)
+ }
+ credit_window_filter.credit(delivery.size, 1)
+ }
- class SessionAckHandler extends AckHandler{
- var consumer_acks = ListBuffer[(AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit)]()
+ def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+ }
- def track(delivery:Delivery) = {
- queue.apply {
- if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
- }
- consumer_acks += (( delivery.message.id, delivery.ack ))
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ async_die("The subscription ack mode does not expect ACK or NACK frames")
}
}
+ class TrackedAck(var credit:Option[Int], val ack:(DeliveryResult, StoreUOW)=>Unit)
- def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ class SessionAckHandler extends AckHandler{
+ var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
- // session acks ack all previously recieved messages..
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
- if( found ) {
- false
- } else {
- if( id == msgid ) {
- found = true
+ def track(delivery:Delivery) = {
+ queue.apply {
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
}
- true
+ consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
}
}
- if( acked.isEmpty ) {
- async_die("ACK failed, invalid message id: %s".format(msgid))
- } else {
- consumer_acks = not_acked
- acked.foreach{case (id, ack)=>
- if( ack!=null ) {
- ack(consumed, uow)
+ def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+ if( initial_credit_window._3 ) {
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( found ) {
+ false
+ } else {
+ if( id == msgid ) {
+ found = true
+ }
+ true
+ }
+ }
+
+ for( (id, delivery) <- acked ) {
+ for( credit <- delivery.credit ) {
+ credit_window_filter.credit(credit, 1)
+ delivery.credit = None
+ }
+ }
+ } else {
+ if( credit_value!=null ) {
+ credit_window_filter.credit(credit_value._1, credit_value._2)
}
}
}
- if( protocol_version eq V1_0 ) {
- connection_ack_handlers.remove(msgid)
- }
- }
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
- }
- class MessageAckHandler extends AckHandler {
- var consumer_acks = HashMap[AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit]()
+ // session acks ack all previously recieved messages..
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( found ) {
+ false
+ } else {
+ if( id == msgid ) {
+ found = true
+ }
+ true
+ }
+ }
+
+ if( acked.isEmpty ) {
+ println("ACK failed, invalid message id: %s".format(msgid))
+ } else {
+ consumer_acks = not_acked
+ if( acked.size != 1 ) {
+ println("ACKS: %s".format(acked.map(_._1))+" uow "+uow)
+ }
+ acked.foreach{case (id, delivery)=>
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
+ }
+ }
+ }
- def track(delivery:Delivery) = {
- queue.apply {
if( protocol_version eq V1_0 ) {
- // register on the connection since 1.0 acks may not include the subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
+ connection_ack_handlers.remove(msgid)
}
- consumer_acks += ( delivery.message.id -> delivery.ack )
}
+
}
- def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
- consumer_acks.remove(msgid) match {
- case Some(ack) =>
- if( ack!=null ) {
- ack(consumed, uow)
+ class MessageAckHandler extends AckHandler {
+ var consumer_acks = HashMap[AsciiBuffer, TrackedAck]()
+
+ def track(delivery:Delivery) = {
+ queue.apply {
+ if( protocol_version eq V1_0 ) {
+ // register on the connection since 1.0 acks may not include the subscription id
+ connection_ack_handlers += ( delivery.message.id-> this )
+ }
+ consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
+ }
+ }
+
+ def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+ if( initial_credit_window._3 ) {
+ for( delivery <- consumer_acks.get(msgid)) {
+ for( credit <- delivery.credit ) {
+ credit_window_filter.credit(credit, 1)
+ delivery.credit = None
+ }
}
- case None => async_die("ACK failed, invalid message id: %s".format(msgid))
+ } else {
+ if( credit_value!=null ) {
+ credit_window_filter.credit(credit_value._1, credit_value._2)
+ }
+ }
}
- if( protocol_version eq V1_0 ) {
- connection_ack_handlers.remove(msgid)
+ def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+ consumer_acks.remove(msgid) match {
+ case Some(delivery) =>
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
+ }
+ case None => async_die("ACK failed, invalid message id: %s".format(msgid))
+ }
+
+ if( protocol_version eq V1_0 ) {
+ connection_ack_handlers.remove(msgid)
+ }
}
}
- }
- class StompConsumer(
+ val ack_handler = ack_mode match {
+ case ACK_MODE_AUTO=>new AutoAckHandler
+ case ACK_MODE_NONE=>new AutoAckHandler
+ case ACK_MODE_CLIENT=> new SessionAckHandler
+ case ACK_MODE_SESSION=> new SessionAckHandler
+ case ACK_MODE_MESSAGE=> new MessageAckHandler
+ case ack:AsciiBuffer =>
+ die("Unsuported ack mode: "+ack);
+ }
- val subscription_id:Option[AsciiBuffer],
- val destination:Array[DestinationDTO],
- val ack_handler:AckHandler,
- val selector:(String, BooleanExpression),
- override val browser:Boolean,
- override val exclusive:Boolean,
- val auto_delete:Boolean
- ) extends BaseRetained with DeliveryConsumer {
+ val consumer_sink = sink_manager.open()
+ val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
+ ack_handler.track(delivery)
+ var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+ if( subscription_id != None ) {
+ frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+ }
+ frame
+ }, Delivery)
-//// The following comes in handy if we need to debug the
-//// reference counts of the consumers.
-//
-// val r = new BaseRetained
-//
-// def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
-// def retained: Int =r.retained
-//
-// def printST(name:String) = {
-// val e = new Exception
-// println(name+": ")
-// println(" "+e.getStackTrace.drop(1).take(4).mkString("\n "))
-// }
-//
-// def retain: Unit = {
-// printST("retain")
-// r.retain
-// }
-// def release: Unit = {
-// printST("release")
-// r.release
-// }
+ credit_window_filter.credit(initial_credit_window._1, initial_credit_window._2)
+
+ val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery)
+
+ override def dispose() = dispatchQueue {
+ super.dispose()
+ sink_manager.close(consumer_sink)
+ }
val dispatch_queue = StompProtocolHandler.this.dispatchQueue
@@ -274,7 +354,7 @@ class StompProtocolHandler extends Proto
}
}
- def connect(p:DeliveryProducer) = new DeliverySession {
+ def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
// This session object should only be used from the dispatch queue context
// of the producer.
@@ -287,9 +367,9 @@ class StompProtocolHandler extends Proto
def consumer = StompConsumer.this
var closed = false
- val session = session_manager.open(producer.dispatch_queue, codec.write_buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
- def remaining_capacity = session.remaining_capacity
+ def remaining_capacity = downstream.remaining_capacity
def close = {
assert(producer.dispatch_queue.isExecuting)
@@ -303,16 +383,20 @@ class StompProtocolHandler extends Proto
frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
}
- if( session.full ) {
+ val delivery = new Delivery()
+ delivery.message = StompFrameMessage(frame)
+ delivery.size = frame.size
+
+ if( downstream.full ) {
// session is full so use an overflow sink so to hold the message,
// and then trigger closing the session once it empties out.
- val sink = new OverflowSink(session)
+ val sink = new OverflowSink(downstream)
sink.refiller = ^{
dispose
}
- sink.offer(frame)
+ sink.offer(delivery)
} else {
- session.offer(frame)
+ downstream.offer(delivery)
dispose
}
} else {
@@ -322,7 +406,7 @@ class StompProtocolHandler extends Proto
}
def dispose = {
- session_manager.close(session)
+ session_manager.close(downstream)
if( auto_delete ) {
reset {
val rc = host.router.delete(destination, security_context)
@@ -338,30 +422,22 @@ class StompProtocolHandler extends Proto
}
// Delegate all the flow control stuff to the session
- def full = session.full
def offer(delivery:Delivery) = {
- if( session.full ) {
+ if( full ) {
false
} else {
- ack_handler.track(delivery)
- var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
- if( subscription_id != None ) {
- frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
- }
- frame.retain
- val rc = session.offer(frame)
+ delivery.message.retain()
+ val rc = downstream.offer(delivery)
assert(rc, "offer should be accepted since it was not full")
true
}
}
- def refiller = session.refiller
- def refiller_=(value:Runnable) = { session.refiller=value }
-
}
}
- var session_manager:SinkMux[StompFrame] = null
+// var session_manager:SessionSinkMux[StompFrame] = null
+ var sink_manager:SinkMux[StompFrame] = null
var connection_sink:Sink[StompFrame] = null
var dead = false
@@ -379,7 +455,7 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatch_queue
// uses by STOMP 1.0 clients
- var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
+ var connection_ack_handlers = HashMap[AsciiBuffer, StompConsumer#AckHandler]()
var protocol_version:AsciiBuffer = _
@@ -490,13 +566,11 @@ class StompProtocolHandler extends Proto
}
override def on_transport_connected() = {
-
- session_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
+ sink_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
trace("sending frame: %s", x)
x
- }, dispatchQueue, StompFrame)
- connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
- connection_sink.refiller = NOOP
+ })
+ connection_sink = new OverflowSink(sink_manager.open());
resumeRead
}
@@ -920,6 +994,20 @@ class StompProtocolHandler extends Proto
var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
var auto_delete = get(headers, AUTO_DELETE).map( _ == TRUE ).getOrElse(false)
+ val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
+ val credit_window = get(headers, CREDIT) match {
+ case Some(value) =>
+ value.toString.split(",").toList match {
+ case x :: Nil =>
+ (codec.write_buffer_size, x.toInt, true)
+ case x :: y :: Nil =>
+ (y.toInt, x.toInt, true)
+ case x :: y :: z :: _ =>
+ (y.toInt, x.toInt, z.toBoolean)
+ }
+ case None =>
+ (codec.write_buffer_size, 1, true)
+ }
if(auto_delete) {
if( destination.length != 1 ) {
@@ -931,19 +1019,6 @@ class StompProtocolHandler extends Proto
}
}
- val ack = get(headers, ACK_MODE) match {
- case None=> new AutoAckHandler
- case Some(x)=> x match {
- case ACK_MODE_AUTO=>new AutoAckHandler
- case ACK_MODE_NONE=>new AutoAckHandler
- case ACK_MODE_CLIENT=> new SessionAckHandler
- case ACK_MODE_SESSION=> new SessionAckHandler
- case ACK_MODE_MESSAGE=> new MessageAckHandler
- case ack:AsciiBuffer =>
- die("Unsuported ack mode: "+ack);
- }
- }
-
val selector = get(headers, SELECTOR) match {
case None=> null
case Some(x)=> x
@@ -975,7 +1050,7 @@ class StompProtocolHandler extends Proto
}
}
- val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive, auto_delete);
+ val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, auto_delete, credit_window);
consumers += (id -> consumer)
reset {
@@ -1051,7 +1126,22 @@ class StompProtocolHandler extends Proto
}
def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {
- val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
+ val credit = get(headers, CREDIT) match {
+ case None => null
+ case Some(value) =>
+ value.toString.split(",").toList match {
+ case x :: Nil =>
+ (0, x.toInt)
+ case x :: y :: _ =>
+ (y.toInt, x.toInt)
+ }
+
+ }
+ val messageId = get(headers, MESSAGE_ID).getOrElse(null)
+
+ if( credit==null && messageId==null) {
+ die("message id header not set")
+ }
val subscription_id = get(headers, SUBSCRIPTION);
val handler = subscription_id match {
@@ -1065,13 +1155,16 @@ class StompProtocolHandler extends Proto
}
handler.foreach{ handler=>
- get(headers, TRANSACTION) match {
- case None=>
- handler.perform_ack(consumed, messageId, null)
- case Some(txid)=>
- get_or_create_tx_queue(txid).add{ uow=>
- handler.perform_ack(consumed, messageId, uow)
- }
+ handler.credit(messageId, credit)
+ if( messageId!=null ) {
+ get(headers, TRANSACTION) match {
+ case None=>
+ handler.perform_ack(consumed, messageId, null)
+ case Some(txid)=>
+ get_or_create_tx_queue(txid).add{ uow=>
+ handler.perform_ack(consumed, messageId, uow)
+ }
+ }
}
send_receipt(headers)
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Thu Jul 14 20:29:59 2011
@@ -187,7 +187,7 @@ trait Watchog extends RemoteConsumer {
dispatch_queue.executeAfter(seconds, TimeUnit.SECONDS, ^ {
if (messageCount == lastMessageCount) {
warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
- stop
+ stop()
} else {
watchdog(messageCount)
}
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Jul 14 20:29:59 2011
@@ -1130,6 +1130,82 @@ Example:
this message will expire on Tue Jun 21 17:02:28 EDT 2011
^@
+### Subscription Flow Control
+
+You can add a `credit` header to the `SUBSCRIBE` frame to control the
+flow of messages delivered to the client. Each subscription maintains 2
+credit windows which allows you to control the flow of messages either based
+on number of messages sent or total number of content bytes delivered to the
+client. The content bytes are the body of the frame which is usually also
+set on the `content-length` header. If either of the two credit windows have
+a positive value, the server will deliver messages to the client.
+
+The `credit` header value is expected to use the
+`count[,size[,auto]]` syntax where:
+
+ * count: initial setting of the credit window tracking the remaining number
+ of messages that can be sent.
+ * size: initial setting of the credit window tracking the remaining number
+ of content bytes that can be sent.
+ * auto: `true` or `false` controls if the credit windows will be
+ automatically incremented.
+
+The `credit` header defaults to `1,65536,true`. The `size`, and
+`auto` parts are optional, and if not specified the default values are used.
+This default setting allows you to receive both the case of:
+
+ * many small messages before needing an ack
+ * at least 1 big (larger than 65536 bytes) message before needing.
+
+
+As messages are sent from the server to the client, the credit windows are
+reduced by the corresponding amount. If the `auto` option is true, then when
+the client send `ACK` frames to the server, the credit windows are
+incremented by the number of messages and content sizes corresponding to the
+messages that were acknowledged.
+
+Example:
+
+ SUBSCRIBE
+ id:mysub
+ destination:/queue/foo
+ credit:5,0
+
+ ^@
+
+The above example would cause the subscription to only receive at most 5
+messages if the client is not sending any `ACK` messages back to the server.
+
+If the `auto` option is set to `false`, then the credit windows are only
+increased when the server receives `ACK` frames which contain a `credit`
+header. The header value is expected to use the `count[,size]` syntax. If
+`size` is specified, it defaults to 0.
+
+Example:
+
+ SUBSCRIBE
+ id:mysub
+ destination:/queue/foo
+ credit:1,0,false
+
+ ^@
+ ACK
+ id:mysub
+ message-id:id-52321
+ credit:1,1204
+
+ ^@
+
+You can also use the `ACK` frame to increase the credit windows sizes
+without needing to acknowledge as message. To do this, don't include the
+`message-id` header in the `ACK` frame. Example:
+
+ ACK
+ id:mysub
+ credit:3
+
+ ^@
+
### Topic Durable Subscriptions
A durable subscription is a queue which is subscribed to a topic so that