You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/16 20:38:53 UTC

svn commit: r1158414 - in /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire: OpenwireMessage.scala OpenwireProtocolHandler.scala

Author: chirino
Date: Tue Aug 16 18:38:53 2011
New Revision: 1158414

URL: http://svn.apache.org/viewvc?rev=1158414&view=rev
Log:
Implemented consumer prefetch handling.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1158414&r1=1158413&r2=1158414&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala Tue Aug 16 18:38:53 2011
@@ -23,6 +23,7 @@ import org.fusesource.hawtbuf.Buffer._
 import OpenwireConstants._
 import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer}
 import command.{ActiveMQBytesMessage, ActiveMQTextMessage, ActiveMQMessage}
+import org.apache.activemq.apollo.broker.protocol.Protocol
 
 /**
  * <p>
@@ -83,4 +84,19 @@ class OpenwireMessage(val message:Active
     }).asInstanceOf[T]
   }
 
+}
+
+object EndOfBrowseMessage extends Message {
+  def retained(): Int = 0
+  def retain() {}
+  def release() {}
+  def protocol: Protocol = null
+  def producer: AsciiBuffer = null
+  def priority: Byte = 0
+  def persistent: Boolean = false
+  def id: AsciiBuffer = null
+  def expiration: Long = 0L
+  def getProperty(name: String): AnyRef = null
+  def getLocalConnectionId: AnyRef = null
+  def getBodyAs[T](toType : Class[T]) = null.asInstanceOf[T]
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1158414&r1=1158413&r2=1158414&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Aug 16 18:38:53 2011
@@ -73,8 +73,8 @@ class OpenwireProtocolHandler extends Pr
 
   def protocol = PROTOCOL
 
-  var outbound_sessions: SessionSinkMux[Command] = null
-  var connection_session: Sink[Command] = null
+  var sink_manager:SinkMux[Command] = null
+  var connection_session:Sink[Command] = null
   var closed = false
 
   var last_command_id=0
@@ -177,14 +177,13 @@ class OpenwireProtocolHandler extends Pr
   override def on_transport_connected():Unit = {
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
-    outbound_sessions = new SessionSinkMux[Command](connection.transport_sink.map {
-      x:Command =>
-        x.setCommandId(next_command_id)
-        debug("sending openwire command: %s", x.toString())
-        x
-    }, dispatchQueue, OpenwireCodec)
-    connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue));
-    connection_session.refiller = NOOP
+
+    sink_manager = new SinkMux[Command]( connection.transport_sink.map {x=>
+      x.setCommandId(next_command_id)
+      debug("sending openwire command: %s", x.toString())
+      x
+    })
+    connection_session = new OverflowSink(sink_manager.open());
 
     // Send our preferred wire format settings..
     connection.transport.offer(preferred_wireformat_settings)
@@ -617,12 +616,13 @@ class OpenwireProtocolHandler extends Pr
 
   def on_message_ack(info:MessageAck) = {
     val consumer = all_consumers.get(info.getConsumerId).getOrElse(die("Cannot ack a message on a consumer that had not been registered."))
+    consumer.ack_handler.credit(info)
     info.getTransactionId match {
       case null =>
-        consumer.ack_handler(info)
+        consumer.ack_handler.perform_ack(info)
       case txid =>
         get_or_create_tx_ctx(consumer.parent.parent, txid){ (uow)=>
-          consumer.ack_handler(info, uow)
+          consumer.ack_handler.perform_ack(info, uow)
         }
     }
     ack(info)
@@ -702,6 +702,26 @@ class OpenwireProtocolHandler extends Pr
     var selector_expression:BooleanExpression = _
     var destination:Array[DestinationDTO] = _
 
+    val consumer_sink = sink_manager.open()
+    val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery =>
+      val dispatch = new MessageDispatch
+      dispatch.setConsumerId(info.getConsumerId)
+      if( delivery.message eq EndOfBrowseMessage ) {
+        // Then send the end of browse message.
+        dispatch
+      } else {
+        var msg = delivery.message.asInstanceOf[OpenwireMessage].message
+        ack_handler.track(msg.getMessageId, delivery.ack)
+        dispatch.setDestination(msg.getDestination)
+        dispatch.setMessage(msg)
+      }
+      dispatch
+    }, Delivery)
+
+    credit_window_filter.credit(0, info.getPrefetchSize)
+
+    val session_manager = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery)
+
     override def exclusive = info.isExclusive
     override def browser = info.isBrowser
 
@@ -751,7 +771,7 @@ class OpenwireProtocolHandler extends Pr
             ack(info)
             noop
           case Some(reason) =>
-            fail(reason, info)
+            async_fail(reason, info)
             noop
         }
       }
@@ -773,6 +793,7 @@ class OpenwireProtocolHandler extends Pr
     override def connection = Some(OpenwireProtocolHandler.this.connection)
 
     def is_persistent = false
+    override def receive_buffer_size = codec.write_buffer_size
 
     def matches(delivery:Delivery) = {
       if( delivery.message.protocol eq OpenwireProtocol ) {
@@ -786,107 +807,231 @@ class OpenwireProtocolHandler extends Pr
       }
     }
 
-    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Command] {
+    def connect(p:DeliveryProducer) = new DeliverySession with SinkFilter[Delivery] {
       retain
 
       def producer = p
       def consumer = ConsumerContext.this
       var closed = false
 
-      val outbound_session = outbound_sessions.open(producer.dispatch_queue)
-
-      def downstream = outbound_session
+      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
+      def remaining_capacity = downstream.remaining_capacity
 
       def close = {
-
         assert(producer.dispatch_queue.isExecuting)
         if( !closed ) {
           closed = true
           if( browser ) {
-            // Then send the end of browse message.
-            var dispatch = new MessageDispatch
-            dispatch.setConsumerId(this.consumer.info.getConsumerId)
-            dispatch.setMessage(null)
-            dispatch.setDestination(null)
 
-            if( outbound_session.full ) {
+            val delivery = new Delivery()
+            delivery.message = EndOfBrowseMessage
+
+            if( downstream.full ) {
               // session is full so use an overflow sink so to hold the message,
               // and then trigger closing the session once it empties out.
-              val sink = new OverflowSink(outbound_session)
+              val sink = new OverflowSink(downstream)
               sink.refiller = ^{
-                outbound_sessions.close(outbound_session)
-                release
+                dispose
               }
-              sink.offer(dispatch)
+              sink.offer(delivery)
             } else {
-              outbound_session.offer(dispatch)
-              outbound_sessions.close(outbound_session)
-              release
+              downstream.offer(delivery)
+              dispose
             }
           } else {
-            outbound_sessions.close(outbound_session)
-            release
+            dispose
           }
         }
       }
-
-      def remaining_capacity = outbound_session.remaining_capacity
+//      def close = {
+//
+//        assert(producer.dispatch_queue.isExecuting)
+//        if( !closed ) {
+//          closed = true
+//          if( browser ) {
+//            // Then send the end of browse message.
+//            var dispatch = new MessageDispatch
+//            dispatch.setConsumerId(this.consumer.info.getConsumerId)
+//            dispatch.setMessage(null)
+//            dispatch.setDestination(null)
+//
+//            if( downstream.full ) {
+//              // session is full so use an overflow sink so to hold the message,
+//              // and then trigger closing the session once it empties out.
+//              val sink = new OverflowSink(downstream)
+//              sink.refiller = ^{
+//                outbound_sessions.close(downstream)
+//                release
+//              }
+//              sink.offer(dispatch)
+//            } else {
+//              downstream.offer(dispatch)
+//              outbound_sessions.close(downstream)
+//              release
+//            }
+//          } else {
+//            outbound_sessions.close(downstream)
+//            release
+//          }
+//        }
+//      }
+
+      def dispose = {
+        session_manager.close(downstream)
+//        if( auto_delete ) {
+//          reset {
+//            val rc = host.router.delete(destination, security_context)
+//            rc match {
+//              case Some(error) =>
+//                async_die(error)
+//              case None =>
+//                unit
+//            }
+//          }
+//        }
+        release
+      }
 
       // Delegate all the flow control stuff to the session
       def offer(delivery:Delivery) = {
-        if( outbound_session.full ) {
+        if( full ) {
           false
         } else {
-          var msg = delivery.message.asInstanceOf[OpenwireMessage].message
-          ack_handler.track(msg.getMessageId, delivery.ack)
-          val dispatch = new MessageDispatch
-          dispatch.setConsumerId(info.getConsumerId)
-          dispatch.setDestination(msg.getDestination)
-          dispatch.setMessage(msg)
-
-          val rc = outbound_session.offer(dispatch)
+          delivery.message.retain()
+          val rc = downstream.offer(delivery)
           assert(rc, "offer should be accepted since it was not full")
           true
         }
       }
     }
 
+    class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
+      var credited = false
+    }
+
+    val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatch_queue)
+    ack_source.setEventHandler(^ {
+      val data = ack_source.getData
+      credit_window_filter.credit(0, data)
+    });
+    ack_source.resume
+
     object ack_handler {
 
       // TODO: Need to validate all the range ack cases...
-      var consumer_acks = ListBuffer[(MessageId, (DeliveryResult, StoreUOW)=>Unit)]()
+      var consumer_acks = ListBuffer[(MessageId,TrackedAck)]()
 
-      def track(id:MessageId, callback:(DeliveryResult, StoreUOW)=>Unit) = {
-        queue {
-          consumer_acks += (( id, callback ))
-        }
+      def track(msgid:MessageId, ack:(DeliveryResult, StoreUOW)=>Unit) = {
+        queue.assertExecuting()
+        consumer_acks += msgid -> new TrackedAck(ack)
       }
 
-      def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
-
-        var found = false
-        val (acked, not_acked) = consumer_acks.partition{ case (id, _)=>
-          if( found ) {
-            false
-          } else {
-            if( id == messageAck.getLastMessageId ) {
+      def credit(messageAck: MessageAck):Unit = {
+        queue.assertExecuting()
+        val msgid: MessageId = messageAck.getLastMessageId
+        if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
+          for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
+            if ( !delivery.credited ) {
+              ack_source.merge(1)
+              delivery.credited = true;
+            }
+          }
+        } else {
+          var found = false
+          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+            if( id == msgid ) {
               found = true
+              true
+            } else {
+              !found
+            }
+          }
+
+          for( (id, delivery) <- acked ) {
+            // only credit once...
+            if( !delivery.credited ) {
+              ack_source.merge(1)
+              delivery.credited = true;
             }
-            true
           }
         }
+      }
+
+      def perform_ack(messageAck: MessageAck, uow:StoreUOW=null) = {
+        queue.assertExecuting()
+
+        val msgid = messageAck.getLastMessageId
+        val consumed = messageAck.getAckType match {
+          case MessageAck.DELIVERED_ACK_TYPE => Delivered
+          case MessageAck.INDIVIDUAL_ACK_TYPE => Delivered
+          case MessageAck.STANDARD_ACK_TYPE => Delivered
+          case MessageAck.POSION_ACK_TYPE => Poisoned
+          case MessageAck.REDELIVERED_ACK_TYPE => Undelivered
+          case MessageAck.UNMATCHED_ACK_TYPE => Delivered
+        }
 
-        if( acked.isEmpty ) {
-          async_fail("ACK failed, invalid message id: %s".format(messageAck.getLastMessageId), messageAck)
+        if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
+          consumer_acks = consumer_acks.filterNot{ case (id, delivery)=>
+            if( id == msgid) {
+              if( delivery.ack!=null ) {
+                delivery.ack(consumed, uow)
+              }
+              true
+            } else {
+              false
+            }
+          }
         } else {
-          consumer_acks = not_acked
-          acked.foreach{case (_, callback)=>
-            if( callback!=null ) {
-              callback(Delivered, uow)
+          // session acks ack all previously received messages..
+          var found = false
+          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+            if( id == msgid ) {
+              found = true
+              true
+            } else {
+              !found
+            }
+          }
+
+          if( !found ) {
+            trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address, msgid, destination.mkString(",")))
+          } else {
+            consumer_acks = not_acked
+            acked.foreach{case (id, delivery)=>
+              if( delivery.ack!=null ) {
+                delivery.ack(consumed, uow)
+              }
             }
           }
         }
+
       }
+//
+//      def apply(messageAck: MessageAck, uow:StoreUOW=null) = {
+//
+//        var found = false
+//        val (acked, not_acked) = consumer_acks.partition{ case (id, _)=>
+//          if( found ) {
+//            false
+//          } else {
+//            if( id == messageAck.getLastMessageId ) {
+//              found = true
+//            }
+//            true
+//          }
+//        }
+//
+//        if( acked.isEmpty ) {
+//          async_fail("ACK failed, invalid message id: %s".format(messageAck.getLastMessageId), messageAck)
+//        } else {
+//          consumer_acks = not_acked
+//          acked.foreach{case (_, callback)=>
+//            if( callback!=null ) {
+//              callback(Delivered, uow)
+//            }
+//          }
+//        }
+//      }
     }
   }