You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/07 20:51:53 UTC

svn commit: r1032371 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Author: chirino
Date: Sun Nov  7 19:51:52 2010
New Revision: 1032371

URL: http://svn.apache.org/viewvc?rev=1032371&view=rev
Log:
Simplifying the Sink layer a bit.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1032371&r1=1032370&r2=1032371&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Sun Nov  7 19:51:52 2010
@@ -20,6 +20,7 @@ import _root_.org.fusesource.hawtdispatc
 import org.fusesource.hawtdispatch._
 import java.util.{LinkedList}
 import org.apache.activemq.apollo.transport.Transport
+import com.sun.tools.javac.util.ListBuffer
 
 /**
  * <p>
@@ -67,7 +68,8 @@ class TransportSink(val transport:Transp
 
 /**
  * Implements a delivery sink which buffers the overflow of deliveries that
- * a 'down stream' sink cannot accept when it's full.
+ * a 'down stream' sink cannot accept when it's full.  An overflow sink
+ * always accepts offers even when it's full.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -76,6 +78,10 @@ class OverflowSink[T](val downstream:Sin
   private var overflow = new LinkedList[T]()
   var refiller: Runnable = null
 
+  def overflowed = !overflow.isEmpty
+
+  def full = overflowed || downstream.full
+
   downstream.refiller = ^{ drain }
 
   protected def drain:Unit = {
@@ -111,9 +117,6 @@ class OverflowSink[T](val downstream:Sin
   protected def onDelivered(value:T) = {
   }
 
-  def overflowed = !overflow.isEmpty
-
-  def full = overflowed || downstream.full
 }
 
 object MapSink {
@@ -145,19 +148,16 @@ object MapSink {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SinkMux[T](val target:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
-
-  var sessions = List[Session]()
+class SinkMux[T](val downstream:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
 
-  var session_min_credits = 1024*4;
-  var session_credit_capacity = 1024*32
-  var session_max_credits = session_credit_capacity;
+  var sessions = List[Session[T]]()
+  var session_max_credits = 1024*32;
 
-  val overflow = new OverflowSink[(Session,T)](MapSink(target){_._2}) {
+  val overflow = new OverflowSink[(Session[T],T)](MapSink(downstream){_._2}) {
 
     // Once a value leaves the overflow, then we can credit the
-    // session so that more messages can be accpted.
-    override protected def onDelivered(event:(Session,T)) = {
+    // session so that more messages can be accepted.
+    override protected def onDelivered(event:(Session[T],T)) = {
       val session = event._1
       val value = event._2
       session.credit_adder.merge(sizer.size(value));
@@ -167,7 +167,7 @@ class SinkMux[T](val target:Sink[T], val
   // As messages are delivered, and we credit the sessions,
   // that triggers the sessions to refill the overflow.  No
   // need to have a refiller action.
-  overflow.refiller = ^ { }
+  overflow.refiller = NOOP
 
   queue.retain
   setDisposer(^{
@@ -177,7 +177,7 @@ class SinkMux[T](val target:Sink[T], val
 
   // use a event aggregating source to coalesce multiple events from the same thread.
   // all the sessions send to the same source.
-  val source = createSource(new ListEventAggregator[(Session,T)](), queue)
+  val source = createSource(new ListEventAggregator[(Session[T],T)](), queue)
   source.setEventHandler(^{drain_source});
   source.resume
 
@@ -188,100 +188,95 @@ class SinkMux[T](val target:Sink[T], val
     }
   }
 
-  /**
-   * tracks one producer to consumer session / credit window.
-   */
-  class Session(val producer_queue:DispatchQueue) extends Sink[T] {
+  def open(producer_queue:DispatchQueue):Sink[T] = {
+    val session = createSession(producer_queue, session_max_credits)
+    sessions ::= session
+    session
+  }
 
-    // retain since the producer will be using this source to send messages
-    // to the consumer
-    source.retain
-
-    ///////////////////////////////////////////////////
-    // These members are used from the context of the
-    // producer serial dispatch queue
-    ///////////////////////////////////////////////////
-
-    // create a source to coalesce credit events back to the producer side...
-    val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
-    credit_adder.setEventHandler(^{
-      internal_credit(credit_adder.getData.intValue)
-    });
-    credit_adder.resume
-
-    private var credits = 0;
-    private var closed = false
-    var refiller:Runnable = null
-    var _full = false
-
-    def close = {
-      credit_adder.release
-      source.release
-      closed=true
+  def close(session:Sink[T]) = {
+    val s = session.asInstanceOf[Session[T]]
+    sessions = sessions.filterNot( _ == s )
+    s.producer_queue {
+      s.close
     }
+  }
 
-    override def full = _full
+  protected def createSession(producer_queue:DispatchQueue, capacity:Int) = new Session[T](producer_queue, capacity, this)
 
-    override def offer(value: T) = {
-      if( _full || closed ) {
-        false
-      } else {
-        credit_adder.retain
-        internal_credit(-sizer.size(value))
-        source.merge((this, value))
-        true
-      }
-    }
 
-    def internal_credit(value:Int) = {
-      credits += value;
-      if( closed || credits <= 0 ) {
-        _full = true
-      } else if( credits==session_max_credits ) {
-        // refill once we are empty.
-        if( _full ) {
-          _full  = false
-          refiller.run
-        }
+}
+
+/**
+ * tracks one producer to consumer session / credit window.
+ */
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends Sink[T] {
+
+  private def session_max_credits = mux.session_max_credits
+  private def sizer = mux.sizer
+  private def downstream = mux.source
+
+  // retain since the producer will be using this source to send messages
+  // to the consumer
+  downstream.retain
+
+  // create a source to coalesce credit events back to the producer side...
+  val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+  credit_adder.setEventHandler(^{
+    add_credits(credit_adder.getData.intValue)
+  });
+  credit_adder.resume
+
+  private var closed = false
+  private var _full = false
+
+  private def add_credits(value:Int) = {
+    credits += value;
+    if( closed || credits <= 0 ) {
+      _full = true
+    } else if( credits==session_max_credits ) {
+      // refill once we are empty.
+      if( _full ) {
+        _full  = false
+        refiller.run
       }
     }
+  }
 
-    ///////////////////////////////////////////////////
-    // These members are used from the context of the
-    // consumer serial dispatch queue
-    ///////////////////////////////////////////////////
-
-    private var _capacity = 0
-
-    def credit(value:Int) = ^{
-      internal_credit(value)
-    } >>: producer_queue
-
-    def capacity(value:Int) = {
-      val change = value - _capacity;
-      _capacity = value;
-      credit(change)
-    }
+  ///////////////////////////////////////////////////
+  // These members are used from the context of the
+  // producer serial dispatch queue
+  ///////////////////////////////////////////////////
 
-  }
+  var refiller:Runnable = null
 
-  def open(producer_queue:DispatchQueue):Sink[T] = {
-    val session = createSession(producer_queue)
-    sessions = session :: sessions
-    session.capacity(session_max_credits)
-    session
+  override def full = {
+    assert(getCurrentQueue eq producer_queue)
+    _full
   }
 
-  def close(session:Sink[T]) = {
-    val s = session.asInstanceOf[SinkMux[T]#Session]
-    s.producer_queue {
-      s.close
+  override def offer(value: T) = {
+    assert(getCurrentQueue eq producer_queue)
+    if( _full || closed ) {
+      false
+    } else {
+      credit_adder.retain
+      add_credits(-sizer.size(value))
+      downstream.merge((this, value))
+      true
     }
   }
 
-  protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
+  def close = {
+    assert(getCurrentQueue eq producer_queue)
+    credit_adder.release
+    downstream.release
+    closed=true
+  }
+
 }
 
+
 /**
  * A sizer can determine the size of other objects.
  */