You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/07/14 22:30:01 UTC

svn commit: r1146886 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/...

Author: chirino
Date: Thu Jul 14 20:29:59 2011
New Revision: 1146886

URL: http://svn.apache.org/viewvc?rev=1146886&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-34 : Allow a STOMP subscription to control message flow using a credit window

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jul 14 20:29:59 2011
@@ -74,7 +74,7 @@ class Queue(val router: LocalRouter, val
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val session_manager = new SinkMux[Delivery](messages, dispatch_queue, Delivery)
+  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery)
 
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Jul 14 20:29:59 2011
@@ -18,9 +18,9 @@ package org.apache.activemq.apollo.broke
 
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch._
-import java.util.{LinkedList}
+import java.util.LinkedList
 import org.apache.activemq.apollo.transport.Transport
-import collection.mutable.{HashSet, ListBuffer}
+import collection.mutable.HashSet
 
 /**
  * <p>
@@ -52,29 +52,31 @@ trait Sink[T] {
   def refiller:Runnable
   def refiller_=(value:Runnable)
 
-  def map[Y](func: Y=>T ):Sink[Y] = {
-    def outer = Sink.this
-    new Sink[Y]() with SinkFilter {
-      def downstream = outer
-      def offer(value:Y) = {
-        if( full ) {
-          false
-        } else {
-          outer.offer(func(value))
-        }
-      }
-    }
+  def map[Y](func: Y=>T ):Sink[Y] = new SinkMapper[Y,T] {
+    def passing(value: Y) = func(value)
+    def downstream = Sink.this
   }
 
 }
 
-trait SinkFilter {
-  def downstream:Sink[_]
+trait SinkFilter[T] {
+  def downstream:Sink[T]
   def refiller:Runnable = downstream.refiller
   def refiller_=(value:Runnable) { downstream.refiller=value }
   def full: Boolean = downstream.full
 }
 
+trait SinkMapper[T,X] extends Sink[T] with SinkFilter[X] {
+  def offer(value:T) = {
+    if( full ) {
+      false
+    } else {
+      downstream.offer(passing(value))
+    }
+  }
+  def passing(value:T):X
+}
+
 /**
  * <p>
  * A delivery sink which is connected to a transport. It expects the caller's dispatch
@@ -179,11 +181,58 @@ class MutableSink[T] extends Sink[T] {
 }
 
 
+class SinkMux[T](val downstream:Sink[T]) {
+  var sinks = HashSet[Sink[T]]()
+
+  downstream.refiller = ^{
+    sinks.foreach { sink =>
+      sink.refiller.run()
+    }
+  }
+
+  def open():Sink[T] = {
+    val sink = new Sink[T] {
+      var refiller:Runnable = NOOP
+      def offer(value: T) = downstream.offer(value)
+      def full = downstream.full
+    }
+    sinks += sink
+    sink
+  }
+
+  def close(sink:Sink[T]):Unit = {
+    sinks -= sink
+  }
+}
+
+class CreditWindowFilter[T](val downstream:Sink[T], val sizer:Sizer[T]) extends SinkMapper[T,T] {
+
+  var byte_credits = 0
+  var delivery_credits = 0
+
+  override def full: Boolean = downstream.full || ( byte_credits <= 0 && delivery_credits <= 0 )
+
+  def passing(value: T) = {
+    byte_credits -= sizer.size(value)
+    delivery_credits -= 1
+    value
+  }
+
+  def credit(byte_credits:Int, delivery_credits:Int) = {
+    val was_full = full
+    this.byte_credits += byte_credits
+    this.delivery_credits += delivery_credits
+    if( was_full && !full ) {
+      refiller.run()
+    }
+  }
+}
+
 trait SessionSink[T] extends Sink[T] {
   def remaining_capacity:Int
 }
 
-object SinkMux {
+object SessionSinkMux {
   val default_session_max_credits = System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
 }
 
@@ -198,7 +247,7 @@ object SinkMux {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
+class SessionSinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
 
   var sessions = HashSet[Session[T]]()
 
@@ -237,7 +286,7 @@ class SinkMux[T](val downstream:Sink[T],
     sessions.foreach(_.credit_adder.resume)
   }
 
-  def open(producer_queue:DispatchQueue, credits:Int=SinkMux.default_session_max_credits):SessionSink[T] = {
+  def open(producer_queue:DispatchQueue, credits:Int=SessionSinkMux.default_session_max_credits):SessionSink[T] = {
     val session = new Session[T](producer_queue, 0, this)
     consumer_queue <<| ^{
       if( overflow.full ) {
@@ -266,7 +315,7 @@ class SinkMux[T](val downstream:Sink[T],
 /**
  * tracks one producer to consumer session / credit window.
  */
-class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends SessionSink[T] {
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SessionSinkMux[T]) extends SessionSink[T] {
 
   var refiller:Runnable = NOOP
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jul 14 20:29:59 2011
@@ -73,7 +73,7 @@ class OpenwireProtocolHandler extends Pr
 
   def protocol = PROTOCOL
 
-  var outbound_sessions: SinkMux[Command] = null
+  var outbound_sessions: SessionSinkMux[Command] = null
   var connection_session: Sink[Command] = null
   var closed = false
 
@@ -176,7 +176,7 @@ class OpenwireProtocolHandler extends Pr
   override def on_transport_connected():Unit = {
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
-    outbound_sessions = new SinkMux[Command](connection.transport_sink.map {
+    outbound_sessions = new SessionSinkMux[Command](connection.transport_sink.map {
       x:Command =>
         x.setCommandId(next_command_id)
         debug("sending openwire command: %s", x.toString())
@@ -785,7 +785,7 @@ class OpenwireProtocolHandler extends Pr
       }
     }
 
-    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter {
+    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Command] {
       retain
 
       def producer = p

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Thu Jul 14 20:29:59 2011
@@ -384,6 +384,7 @@ object Stomp {
   val ACK_MODE = ascii("ack")
   val ID = ascii("id")
   val SELECTOR = ascii("selector")
+  val CREDIT = ascii("credit")
 
   val LOGIN = ascii("login")
   val PASSCODE = ascii("passcode")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jul 14 20:29:59 2011
@@ -39,7 +39,6 @@ import org.apache.activemq.apollo.transp
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
-import javax.management.remote.rmi._RMIConnection_Stub
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -128,132 +127,213 @@ class StompProtocolHandler extends Proto
 
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
 
-  trait AckHandler {
-    def track(delivery:Delivery):Unit
-    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
-  }
+  class StompConsumer(
 
-  class AutoAckHandler extends AckHandler {
-    def track(delivery:Delivery) = {
-      if( delivery.ack!=null ) {
-        delivery.ack(Delivered, null)
-      }
-    }
+    val subscription_id:Option[AsciiBuffer],
+    val destination:Array[DestinationDTO],
+    ack_mode:AsciiBuffer,
+    val selector:(String, BooleanExpression),
+    override val browser:Boolean,
+    override val exclusive:Boolean,
+    val auto_delete:Boolean,
+    val initial_credit_window:(Int,Int, Boolean)
+  ) extends BaseRetained with DeliveryConsumer {
+
+////  The following comes in handy if we need to debug the
+////  reference counts of the consumers.
+//
+//    val r = new BaseRetained
+//
+//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+//    def retained: Int =r.retained
+//
+//    def printST(name:String) = {
+//      val e = new Exception
+//      println(name+": ")
+//      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
+//    }
+//
+//    def retain: Unit = {
+//      printST("retain")
+//      r.retain
+//    }
+//    def release: Unit = {
+//      printST("release")
+//      r.release
+//    }
 
-    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
-      async_die("The subscription ack mode does not expect ACK or NACK frames")
+    trait AckHandler {
+      def track(delivery:Delivery):Unit
+      def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
+      def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
     }
 
-  }
+    class AutoAckHandler extends AckHandler {
+      def track(delivery:Delivery) = {
+        if( delivery.ack!=null ) {
+          delivery.ack(Delivered, null)
+        }
+        credit_window_filter.credit(delivery.size, 1)
+      }
 
-  class SessionAckHandler extends AckHandler{
-    var consumer_acks = ListBuffer[(AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit)]()
+      def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+      }
 
-    def track(delivery:Delivery) = {
-      queue.apply {
-        if( protocol_version eq V1_0 ) {
-          // register on the connection since 1.0 acks may not include the subscription id
-          connection_ack_handlers += ( delivery.message.id-> this )
-        }
-        consumer_acks += (( delivery.message.id, delivery.ack ))
+      def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+        async_die("The subscription ack mode does not expect ACK or NACK frames")
       }
 
     }
 
+    class TrackedAck(var credit:Option[Int], val ack:(DeliveryResult, StoreUOW)=>Unit)
 
-    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    class SessionAckHandler extends AckHandler{
+      var consumer_acks = ListBuffer[(AsciiBuffer, TrackedAck)]()
 
-      // session acks ack all previously recieved messages..
-      var found = false
-      val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-        if( found ) {
-          false
-        } else {
-          if( id == msgid ) {
-            found = true
+      def track(delivery:Delivery) = {
+        queue.apply {
+          if( protocol_version eq V1_0 ) {
+            // register on the connection since 1.0 acks may not include the subscription id
+            connection_ack_handlers += ( delivery.message.id-> this )
           }
-          true
+          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack )
         }
       }
 
-      if( acked.isEmpty ) {
-        async_die("ACK failed, invalid message id: %s".format(msgid))
-      } else {
-        consumer_acks = not_acked
-        acked.foreach{case (id, ack)=>
-          if( ack!=null ) {
-            ack(consumed, uow)
+      def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+        if( initial_credit_window._3 ) {
+          var found = false
+          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+            if( found ) {
+              false
+            } else {
+              if( id == msgid ) {
+                found = true
+              }
+              true
+            }
+          }
+
+          for( (id, delivery) <- acked ) {
+            for( credit <- delivery.credit ) {
+              credit_window_filter.credit(credit, 1)
+              delivery.credit = None
+            }
+          }
+        } else {
+          if( credit_value!=null ) {
+            credit_window_filter.credit(credit_value._1, credit_value._2)
           }
         }
       }
 
-      if( protocol_version eq V1_0 ) {
-        connection_ack_handlers.remove(msgid)
-      }
-    }
+      def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
 
-  }
-  class MessageAckHandler extends AckHandler {
-    var consumer_acks = HashMap[AsciiBuffer, (DeliveryResult, StoreUOW)=>Unit]()
+        // session acks ack all previously recieved messages..
+        var found = false
+        val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+          if( found ) {
+            false
+          } else {
+            if( id == msgid ) {
+              found = true
+            }
+            true
+          }
+        }
+
+        if( acked.isEmpty ) {
+          println("ACK failed, invalid message id: %s".format(msgid))
+        } else {
+          consumer_acks = not_acked
+          if( acked.size != 1 ) {
+            println("ACKS: %s".format(acked.map(_._1))+" uow "+uow)
+          }
+          acked.foreach{case (id, delivery)=>
+            if( delivery.ack!=null ) {
+              delivery.ack(consumed, uow)
+            }
+          }
+        }
 
-    def track(delivery:Delivery) = {
-      queue.apply {
         if( protocol_version eq V1_0 ) {
-          // register on the connection since 1.0 acks may not include the subscription id
-          connection_ack_handlers += ( delivery.message.id-> this )
+          connection_ack_handlers.remove(msgid)
         }
-        consumer_acks += ( delivery.message.id -> delivery.ack )
       }
+
     }
 
-    def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
-      consumer_acks.remove(msgid) match {
-        case Some(ack) =>
-          if( ack!=null ) {
-            ack(consumed, uow)
+    class MessageAckHandler extends AckHandler {
+      var consumer_acks = HashMap[AsciiBuffer, TrackedAck]()
+
+      def track(delivery:Delivery) = {
+        queue.apply {
+          if( protocol_version eq V1_0 ) {
+            // register on the connection since 1.0 acks may not include the subscription id
+            connection_ack_handlers += ( delivery.message.id-> this )
+          }
+          consumer_acks += delivery.message.id -> new TrackedAck(Some(delivery.size), delivery.ack)
+        }
+      }
+
+      def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
+        if( initial_credit_window._3 ) {
+          for( delivery <- consumer_acks.get(msgid)) {
+            for( credit <- delivery.credit ) {
+              credit_window_filter.credit(credit, 1)
+              delivery.credit = None
+            }
           }
-        case None => async_die("ACK failed, invalid message id: %s".format(msgid))
+        } else {
+          if( credit_value!=null ) {
+            credit_window_filter.credit(credit_value._1, credit_value._2)
+          }
+        }
       }
 
-      if( protocol_version eq V1_0 ) {
-        connection_ack_handlers.remove(msgid)
+      def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+        consumer_acks.remove(msgid) match {
+          case Some(delivery) =>
+            if( delivery.ack!=null ) {
+              delivery.ack(consumed, uow)
+            }
+          case None => async_die("ACK failed, invalid message id: %s".format(msgid))
+        }
+
+        if( protocol_version eq V1_0 ) {
+          connection_ack_handlers.remove(msgid)
+        }
       }
     }
-  }
 
-  class StompConsumer(
+    val ack_handler = ack_mode match {
+      case ACK_MODE_AUTO=>new AutoAckHandler
+      case ACK_MODE_NONE=>new AutoAckHandler
+      case ACK_MODE_CLIENT=> new SessionAckHandler
+      case ACK_MODE_SESSION=> new SessionAckHandler
+      case ACK_MODE_MESSAGE=> new MessageAckHandler
+      case ack:AsciiBuffer =>
+        die("Unsuported ack mode: "+ack);
+    }
 
-    val subscription_id:Option[AsciiBuffer],
-    val destination:Array[DestinationDTO],
-    val ack_handler:AckHandler,
-    val selector:(String, BooleanExpression),
-    override val browser:Boolean,
-    override val exclusive:Boolean,
-    val auto_delete:Boolean
-  ) extends BaseRetained with DeliveryConsumer {
+    val consumer_sink = sink_manager.open()
+    val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
+      ack_handler.track(delivery)
+      var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+      if( subscription_id != None ) {
+        frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+      }
+      frame
+    }, Delivery)
 
-////  The following comes in handy if we need to debug the
-////  reference counts of the consumers.
-//
-//    val r = new BaseRetained
-//
-//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
-//    def retained: Int =r.retained
-//
-//    def printST(name:String) = {
-//      val e = new Exception
-//      println(name+": ")
-//      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
-//    }
-//
-//    def retain: Unit = {
-//      printST("retain")
-//      r.retain
-//    }
-//    def release: Unit = {
-//      printST("release")
-//      r.release
-//    }
+    credit_window_filter.credit(initial_credit_window._1, initial_credit_window._2)
+
+    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery)
+
+    override def dispose() = dispatchQueue {
+      super.dispose()
+      sink_manager.close(consumer_sink)
+    }
 
     val dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
@@ -274,7 +354,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    def connect(p:DeliveryProducer) = new DeliverySession {
+    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
 
       // This session object should only be used from the dispatch queue context
       // of the producer.
@@ -287,9 +367,9 @@ class StompProtocolHandler extends Proto
       def consumer = StompConsumer.this
       var closed = false
 
-      val session = session_manager.open(producer.dispatch_queue, codec.write_buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
 
-      def remaining_capacity = session.remaining_capacity
+      def remaining_capacity = downstream.remaining_capacity
 
       def close = {
         assert(producer.dispatch_queue.isExecuting)
@@ -303,16 +383,20 @@ class StompProtocolHandler extends Proto
               frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
             }
 
-            if( session.full ) {
+            val delivery = new Delivery()
+            delivery.message = StompFrameMessage(frame)
+            delivery.size = frame.size
+
+            if( downstream.full ) {
               // session is full so use an overflow sink so to hold the message,
               // and then trigger closing the session once it empties out.
-              val sink = new OverflowSink(session)
+              val sink = new OverflowSink(downstream)
               sink.refiller = ^{
                 dispose
               }
-              sink.offer(frame)
+              sink.offer(delivery)
             } else {
-              session.offer(frame)
+              downstream.offer(delivery)
               dispose
             }
           } else {
@@ -322,7 +406,7 @@ class StompProtocolHandler extends Proto
       }
 
       def dispose = {
-        session_manager.close(session)
+        session_manager.close(downstream)
         if( auto_delete ) {
           reset {
             val rc = host.router.delete(destination, security_context)
@@ -338,30 +422,22 @@ class StompProtocolHandler extends Proto
       }
 
       // Delegate all the flow control stuff to the session
-      def full = session.full
       def offer(delivery:Delivery) = {
-        if( session.full ) {
+        if( full ) {
           false
         } else {
-          ack_handler.track(delivery)
-          var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
-          if( subscription_id != None ) {
-            frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
-          }
-          frame.retain
-          val rc = session.offer(frame)
+          delivery.message.retain()
+          val rc = downstream.offer(delivery)
           assert(rc, "offer should be accepted since it was not full")
           true
         }
       }
 
-      def refiller = session.refiller
-      def refiller_=(value:Runnable) = { session.refiller=value }
-
     }
   }
 
-  var session_manager:SinkMux[StompFrame] = null
+//  var session_manager:SessionSinkMux[StompFrame] = null
+  var sink_manager:SinkMux[StompFrame] = null
   var connection_sink:Sink[StompFrame] = null
 
   var dead = false
@@ -379,7 +455,7 @@ class StompProtocolHandler extends Proto
   private def queue = connection.dispatch_queue
 
   // uses by STOMP 1.0 clients
-  var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
+  var connection_ack_handlers = HashMap[AsciiBuffer, StompConsumer#AckHandler]()
 
   var protocol_version:AsciiBuffer = _
 
@@ -490,13 +566,11 @@ class StompProtocolHandler extends Proto
   }
 
   override def on_transport_connected() = {
-
-    session_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
+    sink_manager = new SinkMux[StompFrame]( connection.transport_sink.map {x=>
       trace("sending frame: %s", x)
       x
-    }, dispatchQueue, StompFrame)
-    connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
-    connection_sink.refiller = NOOP
+    })
+    connection_sink = new OverflowSink(sink_manager.open());
     resumeRead
   }
 
@@ -920,6 +994,20 @@ class StompProtocolHandler extends Proto
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
     var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
     var auto_delete = get(headers, AUTO_DELETE).map( _ == TRUE ).getOrElse(false)
+    val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
+    val credit_window = get(headers, CREDIT) match {
+      case Some(value) =>
+        value.toString.split(",").toList match {
+          case x :: Nil =>
+            (codec.write_buffer_size, x.toInt, true)
+          case x :: y :: Nil =>
+            (y.toInt, x.toInt, true)
+          case x :: y :: z :: _ =>
+            (y.toInt, x.toInt, z.toBoolean)
+        }
+      case None =>
+        (codec.write_buffer_size, 1, true)
+    }
 
     if(auto_delete) {
       if( destination.length != 1 ) {
@@ -931,19 +1019,6 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val ack = get(headers, ACK_MODE) match {
-      case None=> new AutoAckHandler
-      case Some(x)=> x match {
-        case ACK_MODE_AUTO=>new AutoAckHandler
-        case ACK_MODE_NONE=>new AutoAckHandler
-        case ACK_MODE_CLIENT=> new SessionAckHandler
-        case ACK_MODE_SESSION=> new SessionAckHandler
-        case ACK_MODE_MESSAGE=> new MessageAckHandler
-        case ack:AsciiBuffer =>
-          die("Unsuported ack mode: "+ack);
-      }
-    }
-
     val selector = get(headers, SELECTOR) match {
       case None=> null
       case Some(x)=> x
@@ -975,7 +1050,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack, selector, browser, exclusive, auto_delete);
+    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser, exclusive, auto_delete, credit_window);
     consumers += (id -> consumer)
 
     reset {
@@ -1051,7 +1126,22 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {
-    val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
+    val credit = get(headers, CREDIT) match {
+      case None => null
+      case Some(value) =>
+        value.toString.split(",").toList match {
+          case x :: Nil =>
+            (0, x.toInt)
+          case x :: y :: _ =>
+            (y.toInt, x.toInt)
+        }
+
+    }
+    val messageId = get(headers, MESSAGE_ID).getOrElse(null)
+
+    if( credit==null && messageId==null) {
+      die("message id header not set")
+    }
 
     val subscription_id = get(headers, SUBSCRIPTION);
     val handler = subscription_id match {
@@ -1065,13 +1155,16 @@ class StompProtocolHandler extends Proto
     }
 
     handler.foreach{ handler=>
-      get(headers, TRANSACTION) match {
-        case None=>
-          handler.perform_ack(consumed, messageId, null)
-        case Some(txid)=>
-          get_or_create_tx_queue(txid).add{ uow=>
-            handler.perform_ack(consumed, messageId, uow)
-          }
+      handler.credit(messageId, credit)
+      if( messageId!=null ) {
+        get(headers, TRANSACTION) match {
+          case None=>
+            handler.perform_ack(consumed, messageId, null)
+          case Some(txid)=>
+            get_or_create_tx_queue(txid).add{ uow=>
+              handler.perform_ack(consumed, messageId, uow)
+            }
+        }
       }
       send_receipt(headers)
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala Thu Jul 14 20:29:59 2011
@@ -187,7 +187,7 @@ trait Watchog extends RemoteConsumer {
     dispatch_queue.executeAfter(seconds, TimeUnit.SECONDS, ^ {
       if (messageCount == lastMessageCount) {
         warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
-        stop
+        stop()
       } else {
         watchdog(messageCount)
       }

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1146886&r1=1146885&r2=1146886&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Thu Jul 14 20:29:59 2011
@@ -1130,6 +1130,82 @@ Example:
     this message will expire on Tue Jun 21 17:02:28 EDT 2011
     ^@
 
+### Subscription Flow Control
+
+You can add a `credit` header to the `SUBSCRIBE` frame to control the
+flow of messages delivered to the client. Each subscription maintains 2
+credit windows which allows you to control the flow of messages either based
+on number of messages sent or total number of content bytes delivered to the
+client. The content bytes are the body of the frame which is usually also
+set on the `content-length` header. If either of the two credit windows have
+a positive value, the server will deliver messages to the client.
+
+The `credit` header value is expected to use the
+`count[,size[,auto]]` syntax where:
+
+ * count: initial setting of the credit window tracking the remaining number 
+   of messages that can be sent.
+ * size: initial setting of the credit window tracking the remaining number 
+   of content bytes that can be sent.
+ * auto: `true` or `false` controls if the credit windows will be 
+   automatically incremented.
+
+The `credit` header defaults to `1,65536,true`. The `size`, and
+`auto` parts are optional, and if not specified the default values are used.
+This default setting allows you to receive both the case of:
+
+ * many small messages before needing an ack
+ * at least 1 big (larger than 65536 bytes) message before needing.
+
+
+As messages are sent from the server to the client, the credit windows are
+reduced by the corresponding amount. If the `auto` option is true, then when
+the client send `ACK` frames to the server, the credit windows are
+incremented by the number of messages and content sizes corresponding to the
+messages that were acknowledged.
+
+Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/foo
+    credit:5,0
+    
+    ^@
+
+The above example would cause the subscription to only receive at most 5
+messages if the client is not sending any `ACK` messages back to the server.
+
+If the `auto` option is set to `false`, then the credit windows are only
+increased when the server receives `ACK` frames which contain a `credit`
+header. The header value is expected to use the `count[,size]` syntax. If
+`size` is specified, it defaults to 0.
+
+Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/foo
+    credit:1,0,false
+    
+    ^@
+    ACK
+    id:mysub
+    message-id:id-52321
+    credit:1,1204
+    
+    ^@
+
+You can also use the `ACK` frame to increase the credit windows sizes
+without needing to acknowledge as message. To do this, don't include the
+`message-id` header in the `ACK` frame. Example:
+
+    ACK
+    id:mysub
+    credit:3
+    
+    ^@
+
 ### Topic Durable Subscriptions
 
 A durable subscription is a queue which is subscribed to a topic so that