You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/16 20:38:53 UTC
svn commit: r1158414 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire:
OpenwireMessage.scala OpenwireProtocolHandler.scala
Author: chirino
Date: Tue Aug 16 18:38:53 2011
New Revision: 1158414
URL: http://svn.apache.org/viewvc?rev=1158414&view=rev
Log:
Implemented consumer prefetch handling.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1158414&r1=1158413&r2=1158414&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Tue Aug 16 18:38:53 2011
@@ -23,6 +23,7 @@ import org.fusesource.hawtbuf.Buffer._
import OpenwireConstants._
import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
import command.{ActiveMQBytesMessage, ActiveMQTextMessage, ActiveMQMessage}
+import org.apache.activemq.apollo.broker.protocol.Protocol
/**
* <p>
@@ -83,4 +84,19 @@ class OpenwireMessage(val message:Active
}).asInstanceOf[T]
}
+}
+
+object EndOfBrowseMessage extends Message {
+ def retained(): Int = 0
+ def retain() {}
+ def release() {}
+ def protocol: Protocol = null
+ def producer: AsciiBuffer = null
+ def priority: Byte = 0
+ def persistent: Boolean = false
+ def id: AsciiBuffer = null
+ def expiration: Long = 0L
+ def getProperty(name: String): AnyRef = null
+ def getLocalConnectionId: AnyRef = null
+ def getBodyAs[T](toType : Class[T]) = null.asInstanceOf[T]
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1158414&r1=1158413&r2=1158414&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Aug 16 18:38:53 2011
@@ -73,8 +73,8 @@ class OpenwireProtocolHandler extends Pr
def protocol = PROTOCOL
- var outbound_sessions: SessionSinkMux[Command] = null
- var connection_session: Sink[Command] = null
+ var sink_manager:SinkMux[Command] = null
+ var connection_session:Sink[Command] = null
var closed = false
var last_command_id=0
@@ -177,14 +177,13 @@ class OpenwireProtocolHandler extends Pr
override def on_transport_connected():Unit = {
security_context.local_address = connection.transport.getLocalAddress
security_context.remote_address = connection.transport.getRemoteAddress
- outbound_sessions = new SessionSinkMux[Command](connection.transport_sink.map {
- x:Command =>
- x.setCommandId(next_command_id)
- debug("sending openwire command: %s", x.toString())
- x
- }, dispatchQueue, OpenwireCodec)
- connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue));
- connection_session.refiller = NOOP
+
+ sink_manager = new SinkMux[Command]( connection.transport_sink.map {x=>
+ x.setCommandId(next_command_id)
+ debug("sending openwire command: %s", x.toString())
+ x
+ })
+ connection_session = new OverflowSink(sink_manager.open());
// Send our preferred wire format settings..
connection.transport.offer(preferred_wireformat_settings)
@@ -617,12 +616,13 @@ class OpenwireProtocolHandler extends Pr
def on_message_ack(info:MessageAck) = {
val consumer = all_consumers.get(info.getConsumerId).getOrElse(die("Cannot ack a message on a consumer that had not been registered."))
+ consumer.ack_handler.credit(info)
info.getTransactionId match {
case null =>
- consumer.ack_handler(info)
+ consumer.ack_handler.perform_ack(info)
case txid =>
get_or_create_tx_ctx(consumer.parent.parent, txid){ (uow)=>
- consumer.ack_handler(info, uow)
+ consumer.ack_handler.perform_ack(info, uow)
}
}
ack(info)
@@ -702,6 +702,26 @@ class OpenwireProtocolHandler extends Pr
var selector_expression:BooleanExpression = _
var destination:Array[DestinationDTO] = _
+ val consumer_sink = sink_manager.open()
+ val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
+ val dispatch = new MessageDispatch
+ dispatch.setConsumerId(info.getConsumerId)
+ if( delivery.message eq EndOfBrowseMessage ) {
+ // Then send the end of browse message.
+ dispatch
+ } else {
+ var msg = delivery.message.asInstanceOf[OpenwireMessage].message
+ ack_handler.track(msg.getMessageId, delivery.ack)
+ dispatch.setDestination(msg.getDestination)
+ dispatch.setMessage(msg)
+ }
+ dispatch
+ }, Delivery)
+
+ credit_window_filter.credit(0, info.getPrefetchSize)
+
+ val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery)
+
override def exclusive = info.isExclusive
override def browser = info.isBrowser
@@ -751,7 +771,7 @@ class OpenwireProtocolHandler extends Pr
ack(info)
noop
case Some(reason) =>
- fail(reason, info)
+ async_fail(reason, info)
noop
}
}
@@ -773,6 +793,7 @@ class OpenwireProtocolHandler extends Pr
override def connection = Some(OpenwireProtocolHandler.this.connection)
def is_persistent = false
+ override def receive_buffer_size = codec.write_buffer_size
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq OpenwireProtocol ) {
@@ -786,107 +807,231 @@ class OpenwireProtocolHandler extends Pr
}
}
- def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Command] {
+ def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
retain
def producer = p
def consumer = ConsumerContext.this
var closed = false
- val outbound_session = outbound_sessions.open(producer.dispatch_queue)
-
- def downstream = outbound_session
+ val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
+ def remaining_capacity = downstream.remaining_capacity
def close = {
-
assert(producer.dispatch_queue.isExecuting)
if( !closed ) {
closed = true
if( browser ) {
- // Then send the end of browse message.
- var dispatch = new MessageDispatch
- dispatch.setConsumerId(this.consumer.info.getConsumerId)
- dispatch.setMessage(null)
- dispatch.setDestination(null)
- if( outbound_session.full ) {
+ val delivery = new Delivery()
+ delivery.message = EndOfBrowseMessage
+
+ if( downstream.full ) {
// session is full so use an overflow sink so to hold the message,
// and then trigger closing the session once it empties out.
- val sink = new OverflowSink(outbound_session)
+ val sink = new OverflowSink(downstream)
sink.refiller = ^{
- outbound_sessions.close(outbound_session)
- release
+ dispose
}
- sink.offer(dispatch)
+ sink.offer(delivery)
} else {
- outbound_session.offer(dispatch)
- outbound_sessions.close(outbound_session)
- release
+ downstream.offer(delivery)
+ dispose
}
} else {
- outbound_sessions.close(outbound_session)
- release
+ dispose
}
}
}
-
- def remaining_capacity = outbound_session.remaining_capacity
+// def close = {
+//
+// assert(producer.dispatch_queue.isExecuting)
+// if( !closed ) {
+// closed = true
+// if( browser ) {
+// // Then send the end of browse message.
+// var dispatch = new MessageDispatch
+// dispatch.setConsumerId(this.consumer.info.getConsumerId)
+// dispatch.setMessage(null)
+// dispatch.setDestination(null)
+//
+// if( downstream.full ) {
+// // session is full so use an overflow sink so to hold the message,
+// // and then trigger closing the session once it empties out.
+// val sink = new OverflowSink(downstream)
+// sink.refiller = ^{
+// outbound_sessions.close(downstream)
+// release
+// }
+// sink.offer(dispatch)
+// } else {
+// downstream.offer(dispatch)
+// outbound_sessions.close(downstream)
+// release
+// }
+// } else {
+// outbound_sessions.close(downstream)
+// release
+// }
+// }
+// }
+
+ def dispose = {
+ session_manager.close(downstream)
+// if( auto_delete ) {
+// reset {
+// val rc = host.router.delete(destination, security_context)
+// rc match {
+// case Some(error) =>
+// async_die(error)
+// case None =>
+// unit
+// }
+// }
+// }
+ release
+ }
// Delegate all the flow control stuff to the session
def offer(delivery:Delivery) = {
- if( outbound_session.full ) {
+ if( full ) {
false
} else {
- var msg = delivery.message.asInstanceOf[OpenwireMessage].message
- ack_handler.track(msg.getMessageId, delivery.ack)
- val dispatch = new MessageDispatch
- dispatch.setConsumerId(info.getConsumerId)
- dispatch.setDestination(msg.getDestination)
- dispatch.setMessage(msg)
-
- val rc = outbound_session.offer(dispatch)
+ delivery.message.retain()
+ val rc = downstream.offer(delivery)
assert(rc, "offer should be accepted since it was not full")
true
}
}
}
+ class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
+ var credited = false
+ }
+
+ val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
+ ack_source.setEventHandler(^ {
+ val data = ack_source.getData
+ credit_window_filter.credit(0, data)
+ });
+ ack_source.resume
+
object ack_handler {
// TODO: Need to validate all the range ack cases...
- var consumer_acks = ListBuffer[(MessageId, (DeliveryResult, StoreUOW)=>Unit)]()
+ var consumer_acks = ListBuffer[(MessageId,TrackedAck)]()
- def track(id:MessageId, callback:(DeliveryResult, StoreUOW)=>Unit) = {
- queue {
- consumer_acks += (( id, callback ))
- }
+ def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
+ queue.assertExecuting()
+ consumer_acks += msgid -> new TrackedAck(ack)
}
- def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
-
- var found = false
- val (acked, not_acked) = consumer_acks.partition{ case (id, _)=>
- if( found ) {
- false
- } else {
- if( id == messageAck.getLastMessageId ) {
+ def credit(messageAck: MessageAck):Unit = {
+ queue.assertExecuting()
+ val msgid: MessageId = messageAck.getLastMessageId
+ if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
+ for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
+ if ( !delivery.credited ) {
+ ack_source.merge(1)
+ delivery.credited = true;
+ }
+ }
+ } else {
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
found = true
+ true
+ } else {
+ !found
+ }
+ }
+
+ for( (id, delivery) <- acked ) {
+ // only credit once...
+ if( !delivery.credited ) {
+ ack_source.merge(1)
+ delivery.credited = true;
}
- true
}
}
+ }
+
+ def perform_ack(messageAck: MessageAck, uow:StoreUOW=null) = {
+ queue.assertExecuting()
+
+ val msgid = messageAck.getLastMessageId
+ val consumed = messageAck.getAckType match {
+ case MessageAck.DELIVERED_ACK_TYPE => Delivered
+ case MessageAck.INDIVIDUAL_ACK_TYPE => Delivered
+ case MessageAck.STANDARD_ACK_TYPE => Delivered
+ case MessageAck.POSION_ACK_TYPE => Poisoned
+ case MessageAck.REDELIVERED_ACK_TYPE => Undelivered
+ case MessageAck.UNMATCHED_ACK_TYPE => Delivered
+ }
- if( acked.isEmpty ) {
- async_fail("ACK failed, invalid message id: %s".format(messageAck.getLastMessageId), messageAck)
+ if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
+ consumer_acks = consumer_acks.filterNot{ case (id, delivery)=>
+ if( id == msgid) {
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
+ }
+ true
+ } else {
+ false
+ }
+ }
} else {
- consumer_acks = not_acked
- acked.foreach{case (_, callback)=>
- if( callback!=null ) {
- callback(Delivered, uow)
+ // session acks ack all previously received messages..
+ var found = false
+ val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+ if( id == msgid ) {
+ found = true
+ true
+ } else {
+ !found
+ }
+ }
+
+ if( !found ) {
+ trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+ } else {
+ consumer_acks = not_acked
+ acked.foreach{case (id, delivery)=>
+ if( delivery.ack!=null ) {
+ delivery.ack(consumed, uow)
+ }
}
}
}
+
}
+//
+// def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
+//
+// var found = false
+// val (acked, not_acked) = consumer_acks.partition{ case (id, _)=>
+// if( found ) {
+// false
+// } else {
+// if( id == messageAck.getLastMessageId ) {
+// found = true
+// }
+// true
+// }
+// }
+//
+// if( acked.isEmpty ) {
+// async_fail("ACK failed, invalid message id: %s".format(messageAck.getLastMessageId), messageAck)
+// } else {
+// consumer_acks = not_acked
+// acked.foreach{case (_, callback)=>
+// if( callback!=null ) {
+// callback(Delivered, uow)
+// }
+// }
+// }
+// }
}
}