You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/30 22:41:46 UTC

svn commit: r1367282 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Author: chirino
Date: Mon Jul 30 20:41:46 2012
New Revision: 1367282

URL: http://svn.apache.org/viewvc?rev=1367282&view=rev
Log:
Tighten up queue prefetching for openwire.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1367282&r1=1367281&r2=1367282&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jul 30 20:41:46 2012
@@ -811,7 +811,6 @@ class OpenwireProtocolHandler extends Pr
     val consumer_sink = sink_manager.open()
     val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map { event =>
       val (session, delivery) = event
-      session_manager.delivered(session, delivery.size)
       val dispatch = new MessageDispatch
       dispatch.setConsumerId(info.getConsumerId)
       if( delivery.message eq EndOfBrowseMessage ) {
@@ -819,7 +818,7 @@ class OpenwireProtocolHandler extends Pr
         dispatch
       } else {
         var msg = delivery.message.asInstanceOf[OpenwireMessage].message
-        ack_handler.track(msg.getMessageId, delivery.ack)
+        ack_handler.track(msg.getMessageId, delivery.ack, session, delivery.size)
         dispatch.setDestination(msg.getDestination)
         dispatch.setMessage(msg)
       }
@@ -1015,7 +1014,7 @@ class OpenwireProtocolHandler extends Pr
 
     def connect(p:DeliveryProducer) = new OpenwireConsumerSession(p)
 
-    class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
+    class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit, val session:Session[Delivery], val size:Int) {
       var credited = false
     }
 
@@ -1041,7 +1040,7 @@ class OpenwireProtocolHandler extends Pr
         consumer_acks = null
       }
 
-      def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
+      def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit, session:Session[Delivery], size:Int) = {
         queue.assertExecuting()
         if( consumer_acks==null ) {
           // It can happen if we get closed.. but destination is still sending data..
@@ -1049,7 +1048,7 @@ class OpenwireProtocolHandler extends Pr
             ack(Undelivered, null)
           }
         } else {
-          consumer_acks += msgid -> new TrackedAck(ack)
+          consumer_acks += msgid -> new TrackedAck(ack, session, size)
         }
       }
 
@@ -1059,6 +1058,7 @@ class OpenwireProtocolHandler extends Pr
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
           for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
             if ( !delivery.credited ) {
+              session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
               delivery.credited = true;
             }
@@ -1077,6 +1077,7 @@ class OpenwireProtocolHandler extends Pr
           for( (id, delivery) <- acked ) {
             // only credit once...
             if( !delivery.credited ) {
+              session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
               delivery.credited = true;
             }