You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/30 22:41:46 UTC
svn commit: r1367282 -
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Author: chirino
Date: Mon Jul 30 20:41:46 2012
New Revision: 1367282
URL: http://svn.apache.org/viewvc?rev=1367282&view=rev
Log:
Tighten up queue prefetching for openwire.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1367282&r1=1367281&r2=1367282&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jul 30 20:41:46 2012
@@ -811,7 +811,6 @@ class OpenwireProtocolHandler extends Pr
val consumer_sink = sink_manager.open()
val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map { event =>
val (session, delivery) = event
- session_manager.delivered(session, delivery.size)
val dispatch = new MessageDispatch
dispatch.setConsumerId(info.getConsumerId)
if( delivery.message eq EndOfBrowseMessage ) {
@@ -819,7 +818,7 @@ class OpenwireProtocolHandler extends Pr
dispatch
} else {
var msg = delivery.message.asInstanceOf[OpenwireMessage].message
- ack_handler.track(msg.getMessageId, delivery.ack)
+ ack_handler.track(msg.getMessageId, delivery.ack, session, delivery.size)
dispatch.setDestination(msg.getDestination)
dispatch.setMessage(msg)
}
@@ -1015,7 +1014,7 @@ class OpenwireProtocolHandler extends Pr
def connect(p:DeliveryProducer) = new OpenwireConsumerSession(p)
- class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
+ class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit, val session:Session[Delivery], val size:Int) {
var credited = false
}
@@ -1041,7 +1040,7 @@ class OpenwireProtocolHandler extends Pr
consumer_acks = null
}
- def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
+ def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit, session:Session[Delivery], size:Int) = {
queue.assertExecuting()
if( consumer_acks==null ) {
// It can happen if we get closed.. but destination is still sending data..
@@ -1049,7 +1048,7 @@ class OpenwireProtocolHandler extends Pr
ack(Undelivered, null)
}
} else {
- consumer_acks += msgid -> new TrackedAck(ack)
+ consumer_acks += msgid -> new TrackedAck(ack, session, size)
}
}
@@ -1059,6 +1058,7 @@ class OpenwireProtocolHandler extends Pr
if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
if ( !delivery.credited ) {
+ session_manager.delivered(delivery.session, delivery.size)
ack_source.merge(1)
delivery.credited = true;
}
@@ -1077,6 +1077,7 @@ class OpenwireProtocolHandler extends Pr
for( (id, delivery) <- acked ) {
// only credit once...
if( !delivery.credited ) {
+ session_manager.delivered(delivery.session, delivery.size)
ack_source.merge(1)
delivery.credited = true;
}