You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/20 01:44:11 UTC
svn commit: r1387818 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-openwire/src/test/resources/
Author: chirino
Date: Wed Sep 19 23:44:10 2012
New Revision: 1387818
URL: http://svn.apache.org/viewvc?rev=1387818&view=rev
Log:
Clean up bit that are not needed with the new session sink design.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Sep 19 23:44:10 2012
@@ -346,14 +346,13 @@ class SessionSinkMux[T](val downstream:S
var sessions = HashSet[Session[T]]()
var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
- var overflow_size = 0L
def open(producer_queue:DispatchQueue):SessionSink[T] = {
val session = new Session[T](this, producer_queue)
consumer_queue <<| ^{
sessions += session
val bonus = size_credits / sessions.size
- session.consumer_side.size_bonus = bonus
+ session.size_bonus = bonus
session.credit(delivery_credits, 1+bonus);
schedual_rebalance
}
@@ -384,19 +383,8 @@ class SessionSinkMux[T](val downstream:S
def time_stamp = 0L
- downstream.refiller = ^{ drain_source.merge(0x01) }
-
- val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
- drain_source.setEventHandler(^{
- var data = drain_source.getData
- if( data.intValue() > 0 ) {
- drain_overflow
- }
- })
- drain_source.resume()
-
var rebalance_schedualed = false
- def schedual_rebalance = {
+ def schedual_rebalance:Unit = {
if ( !rebalance_schedualed ) {
rebalance_schedualed = true
consumer_queue.after(550, TimeUnit.MILLISECONDS) {
@@ -407,8 +395,8 @@ class SessionSinkMux[T](val downstream:S
}
var last_rebalance_ts = time_stamp
- def rebalance_check = {
- // re-balance periodically since it's a bit expensive.
+ def rebalance_check:Unit = {
+ // only re-balance periodically since it's a bit expensive.
var now = time_stamp
if ( (now - last_rebalance_ts) > 500 && sessions.size>0) {
last_rebalance_ts = now
@@ -417,70 +405,50 @@ class SessionSinkMux[T](val downstream:S
}
def rebalance = {
- val bonus_size = size_credits / sessions.size
-
- // allocate bonus credits to reduce session stalls.
- val sessions_copy = sessions.toArray
-
var stalled_sessions = List[Session[T]]()
var total_stalls = 0L
for ( session <- sessions ) {
- val consumer_side = session.consumer_side
- if ( consumer_side.stall_counter > 0) {
- total_stalls += consumer_side.stall_counter
+ if ( session.stall_counter > 0) {
+ total_stalls += session.stall_counter
stalled_sessions ::= session
}
}
for ( session <- stalled_sessions ) {
- val consumer_side = session.consumer_side
- var slice_percent = consumer_side.stall_counter.toFloat / total_stalls
+ var slice_percent = session.stall_counter.toFloat / total_stalls
val new_size_bonus = (size_credits * slice_percent).toInt
- val change = new_size_bonus - consumer_side.size_bonus
- consumer_side.stall_counter = 0
- consumer_side.size_bonus += change
+ val change = new_size_bonus - session.size_bonus
+ session.stall_counter = 0
+ session.size_bonus += change
session.credit(0, change)
}
}
+ downstream.refiller = ^{ drain_overflow }
def drain_overflow:Unit = {
- consumer_queue.assertExecuting()
- rebalance_check
- while( !downstream.full && !overflowed_sessions.isEmpty) {
-
+ while( !overflowed_sessions.isEmpty) {
val session = overflowed_sessions.getHead.session
- val consumer_side = session.consumer_side
- val value = consumer_side.overflow.removeFirst()
- assert(value!=null)
-
- val accepted = downstream.offer((session, value))
- assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
-
- if ( consumer_side.stall_counter > 0 ) {
- schedual_rebalance
- }
-
- overflow_size -= sizer.size(value)
- if( consumer_side.overflow.isEmpty ) {
- consumer_side.overflow_node.unlink()
- session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
- consumer_side.pending_delivery_credits = 0
- consumer_side.pending_size_credits = 0
+ if( !downstream.full ) {
+ val value = session.overflow.removeFirst()
+ val accepted = downstream.offer((session, value))
+ assert(accepted)
+ if( session.stall_counter > 0 ) {
+ schedual_rebalance
+ }
+ if( session.overflow.isEmpty ) {
+ session.overflow_node.unlink()
+ } else {
+ // to fairly consume values from all sessions.
+ overflowed_sessions.rotate()
+ }
} else {
- // to fairly consume values from all sessions.
- overflowed_sessions.rotate()
+ return
}
}
}
def delivered(session:Session[Delivery], size:Int) = {
- val consumer_side = session.consumer_side
- if( overflow_size >= size_credits && !consumer_side.overflow.isEmpty) {
- consumer_side.pending_delivery_credits += 1
- consumer_side.pending_size_credits += size
- } else {
- session.credit(1, size)
- }
+ session.credit(1, size)
}
}
@@ -492,38 +460,28 @@ case class SessionLinkedNode[T](session:
*/
class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
- // the consumer_side object is mutated from the consumer dispatch queue...
+ // the following Session fields are mutated from the consumer dispatch queue...
// we should think about field padding this object to avoid false sharing on the cache lines.
- object consumer_side {
-
- val overflow = new LinkedList[T]()
- var pending_delivery_credits = 0
- var pending_size_credits = 0
- val overflow_node = SessionLinkedNode[T](Session.this)
- var stall_counter = 0
- var size_bonus = 0
-
- // use a event aggregating source to coalesce multiple events from the same thread.
- val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
-
- source.setEventHandler(^{
- mux.consumer_queue.assertExecuting()
- for( value <- source.getData ) {
- if( overflow.isEmpty ) {
- mux.overflowed_sessions.addLast(overflow_node);
- }
- overflow.add(value._1)
- val size = sizer.size(value._1)
- mux.overflow_size += size
- if (value._2) {
- stall_counter += 1
- }
+ val overflow = new LinkedList[T]()
+ val overflow_node = SessionLinkedNode[T](Session.this)
+ var stall_counter = 0
+ var size_bonus = 0
+
+ // use a event aggregating source to coalesce multiple events from the same thread.
+ val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
+ source.setEventHandler(^{
+ for( (value, stalled) <- source.getData ) {
+ if( overflow.isEmpty ) {
+ mux.overflowed_sessions.addLast(overflow_node);
}
- mux.drain_source.merge(0x02)
- });
- source.resume
-
- }
+ overflow.add(value)
+ if (stalled) {
+ stall_counter += 1
+ }
+ }
+ mux.drain_overflow
+ });
+ source.resume
// the rest of the Session fields are mutated from the producer dispatch queue...
var refiller:Task = NOOP
@@ -603,7 +561,7 @@ class Session[T](mux:SessionSinkMux[T],
add_credits(-1, -size)
val stalled = size_credits <= 0 || delivery_credits<=0
- consumer_side.source.merge((value, stalled))
+ source.merge((value, stalled))
}
true
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Sep 19 23:44:10 2012
@@ -832,7 +832,7 @@ class OpenwireProtocolHandler extends Pr
credit_window_filter.credit(info.getPrefetchSize, 0)
- val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE) {
+ val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE/2) {
override def time_stamp = broker.now
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml Wed Sep 19 23:44:10 2012
@@ -31,7 +31,7 @@
<key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
- <web_admin bind="http://0.0.0.0:0"/>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="ssl" bind="ssl://0.0.0.0:0" />
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml Wed Sep 19 23:44:10 2012
@@ -24,7 +24,7 @@
<key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
- <web_admin bind="http://0.0.0.0:0"/>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="ssl" bind="ssl://0.0.0.0:0" />
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Wed Sep 19 23:44:10 2012
@@ -23,7 +23,7 @@
<host_name>localhost</host_name>
</virtual_host>
- <web_admin bind="http://0.0.0.0:0"/>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
</broker>
\ No newline at end of file