You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/20 01:44:11 UTC

svn commit: r1387818 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-openwire/src/test/resources/

Author: chirino
Date: Wed Sep 19 23:44:10 2012
New Revision: 1387818

URL: http://svn.apache.org/viewvc?rev=1387818&view=rev
Log:
Clean up bit that are not needed with the new session sink design.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Sep 19 23:44:10 2012
@@ -346,14 +346,13 @@ class SessionSinkMux[T](val downstream:S
 
   var sessions = HashSet[Session[T]]()
   var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
-  var overflow_size = 0L
 
   def open(producer_queue:DispatchQueue):SessionSink[T] = {
     val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
       sessions += session
       val bonus = size_credits / sessions.size
-      session.consumer_side.size_bonus = bonus
+      session.size_bonus = bonus
       session.credit(delivery_credits, 1+bonus);
       schedual_rebalance
     }
@@ -384,19 +383,8 @@ class SessionSinkMux[T](val downstream:S
 
   def time_stamp = 0L
 
-  downstream.refiller = ^{ drain_source.merge(0x01) }
-
-  val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
-  drain_source.setEventHandler(^{
-    var data = drain_source.getData
-    if( data.intValue() > 0 ) {
-      drain_overflow
-    }
-  })
-  drain_source.resume()
-
   var rebalance_schedualed = false
-  def schedual_rebalance = {
+  def schedual_rebalance:Unit = {
     if ( !rebalance_schedualed ) {
       rebalance_schedualed  = true
       consumer_queue.after(550, TimeUnit.MILLISECONDS) {
@@ -407,8 +395,8 @@ class SessionSinkMux[T](val downstream:S
   }
 
   var last_rebalance_ts = time_stamp
-  def rebalance_check = {
-    // re-balance periodically since it's a bit expensive.
+  def rebalance_check:Unit = {
+    // only re-balance periodically since it's a bit expensive.
     var now = time_stamp
     if ( (now - last_rebalance_ts) > 500 && sessions.size>0) {
       last_rebalance_ts = now
@@ -417,70 +405,50 @@ class SessionSinkMux[T](val downstream:S
   }
 
   def rebalance = {
-    val bonus_size = size_credits / sessions.size
-
-    // allocate bonus credits to reduce session stalls.
-    val sessions_copy = sessions.toArray
-
     var stalled_sessions = List[Session[T]]()
     var total_stalls = 0L
     for ( session <- sessions ) {
-      val consumer_side = session.consumer_side
-      if ( consumer_side.stall_counter > 0) {
-        total_stalls += consumer_side.stall_counter
+      if ( session.stall_counter > 0) {
+        total_stalls += session.stall_counter
         stalled_sessions ::= session
       }
     }
 
     for ( session <- stalled_sessions ) {
-      val consumer_side = session.consumer_side
-      var slice_percent = consumer_side.stall_counter.toFloat / total_stalls
+      var slice_percent = session.stall_counter.toFloat / total_stalls
       val new_size_bonus = (size_credits * slice_percent).toInt
-      val change = new_size_bonus - consumer_side.size_bonus
-      consumer_side.stall_counter = 0
-      consumer_side.size_bonus += change
+      val change = new_size_bonus - session.size_bonus
+      session.stall_counter = 0
+      session.size_bonus += change
       session.credit(0, change)
     }
   }
+  downstream.refiller = ^{ drain_overflow }
 
   def drain_overflow:Unit = {
-    consumer_queue.assertExecuting()
-    rebalance_check
-    while( !downstream.full && !overflowed_sessions.isEmpty) {
-
+    while( !overflowed_sessions.isEmpty) {
       val session = overflowed_sessions.getHead.session
-      val consumer_side = session.consumer_side
-      val value = consumer_side.overflow.removeFirst()
-      assert(value!=null)
-
-      val accepted = downstream.offer((session, value))
-      assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
-
-      if (  consumer_side.stall_counter > 0  ) {
-        schedual_rebalance
-      }
-
-      overflow_size -= sizer.size(value)
-      if( consumer_side.overflow.isEmpty ) {
-        consumer_side.overflow_node.unlink()
-        session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
-        consumer_side.pending_delivery_credits = 0
-        consumer_side.pending_size_credits = 0
+      if( !downstream.full ) {
+        val value = session.overflow.removeFirst()
+        val accepted = downstream.offer((session, value))
+        assert(accepted)
+        if( session.stall_counter > 0 ) {
+          schedual_rebalance
+        }
+        if( session.overflow.isEmpty ) {
+          session.overflow_node.unlink()
+        } else {
+          // to fairly consume values from all sessions.
+          overflowed_sessions.rotate()
+        }
       } else {
-        // to fairly consume values from all sessions.
-        overflowed_sessions.rotate()
+        return
       }
     }
   }
 
   def delivered(session:Session[Delivery], size:Int) = {
-    val consumer_side = session.consumer_side
-    if( overflow_size >= size_credits && !consumer_side.overflow.isEmpty) {
-      consumer_side.pending_delivery_credits += 1
-      consumer_side.pending_size_credits += size
-    } else {
-      session.credit(1, size)
-    }
+    session.credit(1, size)
   }
 
 }
@@ -492,38 +460,28 @@ case class SessionLinkedNode[T](session:
  */
 class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
 
-  // the consumer_side object is mutated from the consumer dispatch queue...
+  // the following Session fields are mutated from the consumer dispatch queue...
   // we should think about field padding this object to avoid false sharing on the cache lines.
-  object consumer_side {
-
-    val overflow = new LinkedList[T]()
-    var pending_delivery_credits = 0
-    var pending_size_credits = 0
-    val overflow_node = SessionLinkedNode[T](Session.this)
-    var stall_counter = 0
-    var size_bonus = 0
-
-    // use a event aggregating source to coalesce multiple events from the same thread.
-    val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
-
-    source.setEventHandler(^{
-      mux.consumer_queue.assertExecuting()
-      for( value <- source.getData ) {
-        if( overflow.isEmpty ) {
-          mux.overflowed_sessions.addLast(overflow_node);
-        }
-        overflow.add(value._1)
-        val size = sizer.size(value._1)
-        mux.overflow_size += size
-        if (value._2) {
-          stall_counter += 1
-        }
+  val overflow = new LinkedList[T]()
+  val overflow_node = SessionLinkedNode[T](Session.this)
+  var stall_counter = 0
+  var size_bonus = 0
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
+  source.setEventHandler(^{
+    for( (value, stalled) <- source.getData ) {
+      if( overflow.isEmpty ) {
+        mux.overflowed_sessions.addLast(overflow_node);
       }
-      mux.drain_source.merge(0x02)
-    });
-    source.resume
-
-  }
+      overflow.add(value)
+      if (stalled) {
+        stall_counter += 1
+      }
+    }
+    mux.drain_overflow
+  });
+  source.resume
 
   // the rest of the Session fields are mutated from the producer dispatch queue...
   var refiller:Task = NOOP
@@ -603,7 +561,7 @@ class Session[T](mux:SessionSinkMux[T], 
   
         add_credits(-1, -size)
         val stalled = size_credits <= 0 || delivery_credits<=0
-        consumer_side.source.merge((value, stalled))
+        source.merge((value, stalled))
       }
       true
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Sep 19 23:44:10 2012
@@ -832,7 +832,7 @@ class OpenwireProtocolHandler extends Pr
 
     credit_window_filter.credit(info.getPrefetchSize, 0)
 
-    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE/2) {
       override def time_stamp = broker.now
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl-secure.xml Wed Sep 19 23:44:10 2012
@@ -31,7 +31,7 @@
 
   <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
 
-  <web_admin bind="http://0.0.0.0:0"/>
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="ssl" bind="ssl://0.0.0.0:0" />
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-ssl.xml Wed Sep 19 23:44:10 2012
@@ -24,7 +24,7 @@
 
   <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
 
-  <web_admin bind="http://0.0.0.0:0"/>
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="ssl" bind="ssl://0.0.0.0:0" />
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1387818&r1=1387817&r2=1387818&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Wed Sep 19 23:44:10 2012
@@ -23,7 +23,7 @@
       <host_name>localhost</host_name>
   </virtual_host>
 
-  <web_admin bind="http://0.0.0.0:0"/>
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
 
 </broker>
\ No newline at end of file