You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/07 20:51:53 UTC
svn commit: r1032371 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Author: chirino
Date: Sun Nov 7 19:51:52 2010
New Revision: 1032371
URL: http://svn.apache.org/viewvc?rev=1032371&view=rev
Log:
Simplifying the Sink layer a bit.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1032371&r1=1032370&r2=1032371&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Sun Nov 7 19:51:52 2010
@@ -20,6 +20,7 @@ import _root_.org.fusesource.hawtdispatc
import org.fusesource.hawtdispatch._
import java.util.{LinkedList}
import org.apache.activemq.apollo.transport.Transport
+import com.sun.tools.javac.util.ListBuffer
/**
* <p>
@@ -67,7 +68,8 @@ class TransportSink(val transport:Transp
/**
* Implements a delivery sink which buffers the overflow of deliveries that
- * a 'down stream' sink cannot accept when it's full.
+ * a 'down stream' sink cannot accept when it's full. An overflow sink
+ * always accepts offers even when it's full.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -76,6 +78,10 @@ class OverflowSink[T](val downstream:Sin
private var overflow = new LinkedList[T]()
var refiller: Runnable = null
+ def overflowed = !overflow.isEmpty
+
+ def full = overflowed || downstream.full
+
downstream.refiller = ^{ drain }
protected def drain:Unit = {
@@ -111,9 +117,6 @@ class OverflowSink[T](val downstream:Sin
protected def onDelivered(value:T) = {
}
- def overflowed = !overflow.isEmpty
-
- def full = overflowed || downstream.full
}
object MapSink {
@@ -145,19 +148,16 @@ object MapSink {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class SinkMux[T](val target:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
-
- var sessions = List[Session]()
+class SinkMux[T](val downstream:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
- var session_min_credits = 1024*4;
- var session_credit_capacity = 1024*32
- var session_max_credits = session_credit_capacity;
+ var sessions = List[Session[T]]()
+ var session_max_credits = 1024*32;
- val overflow = new OverflowSink[(Session,T)](MapSink(target){_._2}) {
+ val overflow = new OverflowSink[(Session[T],T)](MapSink(downstream){_._2}) {
// Once a value leaves the overflow, then we can credit the
- // session so that more messages can be accpted.
- override protected def onDelivered(event:(Session,T)) = {
+ // session so that more messages can be accepted.
+ override protected def onDelivered(event:(Session[T],T)) = {
val session = event._1
val value = event._2
session.credit_adder.merge(sizer.size(value));
@@ -167,7 +167,7 @@ class SinkMux[T](val target:Sink[T], val
// As messages are delivered, and we credit the sessions,
// that triggers the sessions to refill the overflow. No
// need to have a refiller action.
- overflow.refiller = ^ { }
+ overflow.refiller = NOOP
queue.retain
setDisposer(^{
@@ -177,7 +177,7 @@ class SinkMux[T](val target:Sink[T], val
// use a event aggregating source to coalesce multiple events from the same thread.
// all the sessions send to the same source.
- val source = createSource(new ListEventAggregator[(Session,T)](), queue)
+ val source = createSource(new ListEventAggregator[(Session[T],T)](), queue)
source.setEventHandler(^{drain_source});
source.resume
@@ -188,100 +188,95 @@ class SinkMux[T](val target:Sink[T], val
}
}
- /**
- * tracks one producer to consumer session / credit window.
- */
- class Session(val producer_queue:DispatchQueue) extends Sink[T] {
+ def open(producer_queue:DispatchQueue):Sink[T] = {
+ val session = createSession(producer_queue, session_max_credits)
+ sessions ::= session
+ session
+ }
- // retain since the producer will be using this source to send messages
- // to the consumer
- source.retain
-
- ///////////////////////////////////////////////////
- // These members are used from the context of the
- // producer serial dispatch queue
- ///////////////////////////////////////////////////
-
- // create a source to coalesce credit events back to the producer side...
- val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
- credit_adder.setEventHandler(^{
- internal_credit(credit_adder.getData.intValue)
- });
- credit_adder.resume
-
- private var credits = 0;
- private var closed = false
- var refiller:Runnable = null
- var _full = false
-
- def close = {
- credit_adder.release
- source.release
- closed=true
+ def close(session:Sink[T]) = {
+ val s = session.asInstanceOf[Session[T]]
+ sessions = sessions.filterNot( _ == s )
+ s.producer_queue {
+ s.close
}
+ }
- override def full = _full
+ protected def createSession(producer_queue:DispatchQueue, capacity:Int) = new Session[T](producer_queue, capacity, this)
- override def offer(value: T) = {
- if( _full || closed ) {
- false
- } else {
- credit_adder.retain
- internal_credit(-sizer.size(value))
- source.merge((this, value))
- true
- }
- }
- def internal_credit(value:Int) = {
- credits += value;
- if( closed || credits <= 0 ) {
- _full = true
- } else if( credits==session_max_credits ) {
- // refill once we are empty.
- if( _full ) {
- _full = false
- refiller.run
- }
+}
+
+/**
+ * tracks one producer to consumer session / credit window.
+ */
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends Sink[T] {
+
+ private def session_max_credits = mux.session_max_credits
+ private def sizer = mux.sizer
+ private def downstream = mux.source
+
+ // retain since the producer will be using this source to send messages
+ // to the consumer
+ downstream.retain
+
+ // create a source to coalesce credit events back to the producer side...
+ val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+ credit_adder.setEventHandler(^{
+ add_credits(credit_adder.getData.intValue)
+ });
+ credit_adder.resume
+
+ private var closed = false
+ private var _full = false
+
+ private def add_credits(value:Int) = {
+ credits += value;
+ if( closed || credits <= 0 ) {
+ _full = true
+ } else if( credits==session_max_credits ) {
+ // refill once we are empty.
+ if( _full ) {
+ _full = false
+ refiller.run
}
}
+ }
- ///////////////////////////////////////////////////
- // These members are used from the context of the
- // consumer serial dispatch queue
- ///////////////////////////////////////////////////
-
- private var _capacity = 0
-
- def credit(value:Int) = ^{
- internal_credit(value)
- } >>: producer_queue
-
- def capacity(value:Int) = {
- val change = value - _capacity;
- _capacity = value;
- credit(change)
- }
+ ///////////////////////////////////////////////////
+ // These members are used from the context of the
+ // producer serial dispatch queue
+ ///////////////////////////////////////////////////
- }
+ var refiller:Runnable = null
- def open(producer_queue:DispatchQueue):Sink[T] = {
- val session = createSession(producer_queue)
- sessions = session :: sessions
- session.capacity(session_max_credits)
- session
+ override def full = {
+ assert(getCurrentQueue eq producer_queue)
+ _full
}
- def close(session:Sink[T]) = {
- val s = session.asInstanceOf[SinkMux[T]#Session]
- s.producer_queue {
- s.close
+ override def offer(value: T) = {
+ assert(getCurrentQueue eq producer_queue)
+ if( _full || closed ) {
+ false
+ } else {
+ credit_adder.retain
+ add_credits(-sizer.size(value))
+ downstream.merge((this, value))
+ true
}
}
- protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
+ def close = {
+ assert(getCurrentQueue eq producer_queue)
+ credit_adder.release
+ downstream.release
+ closed=true
+ }
+
}
+
/**
* A sizer can determine the size of other objects.
*/