You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/31 15:02:38 UTC

svn commit: r1367524 - in /activemq/activemq-apollo/trunk: apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Author: chirino
Date: Tue Jul 31 13:02:38 2012
New Revision: 1367524

URL: http://svn.apache.org/viewvc?rev=1367524&view=rev
Log:
Don't release the stomp or openwire consumers until they are closed by the client.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1367524&r1=1367523&r2=1367524&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Jul 31 13:02:38 2012
@@ -781,16 +781,24 @@ class OpenwireProtocolHandler extends Pr
 
   class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends BaseRetained with DeliveryConsumer {
 
-//  The following comes in handy if we need to debug the
-//  reference counts of the consumers.
-//    val r = new BaseRetained
+////  The following comes in handy if we need to debug the
+////  reference counts of the consumers.
+//  class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends Retained with DeliveryConsumer {
+//    val r = new BaseRetained {
+//      override def toString: String = info.getConsumerId.toString
+//    }
 //
-//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
-//    def retained: Int =r.retained
+//    var d = NOOP
+//    def setDisposer(p1: Task): Unit = d = p1
+//    r.setDisposer(^{
+//      dispose();
+//      d.run();
+//    })
+//    def retained: Int = r.retained
 //
-//    def printST(name:String) = {
+//    def printST(name:String) = System.out.synchronized {
 //      val e = new Exception
-//      println(name+": "+connection.map(_.id))
+//      println(name+": "+info.getConsumerId+" @ "+r.retained())
 //      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
 //    }
 //
@@ -895,7 +903,6 @@ class OpenwireProtocolHandler extends Pr
 
       host.dispatch_queue {
         val rc = host.router.bind(addresses, this, security_context)
-        this.release
         dispatchQueue {
           rc match {
             case None =>
@@ -910,6 +917,7 @@ class OpenwireProtocolHandler extends Pr
     def dettach = {
       host.dispatch_queue {
         host.router.unbind(addresses, this, false , security_context)
+        this.release
       }
       parent.consumers.remove(info.getConsumerId)
       all_consumers.remove(info.getConsumerId)
@@ -942,7 +950,7 @@ class OpenwireProtocolHandler extends Pr
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE)
       var closed = false
 
       def consumer = ConsumerContext.this
@@ -1059,9 +1067,9 @@ class OpenwireProtocolHandler extends Pr
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
           for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
             if ( !delivery.credited ) {
+              delivery.credited = true;
               session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
-              delivery.credited = true;
             }
           }
         } else {
@@ -1078,9 +1086,9 @@ class OpenwireProtocolHandler extends Pr
           for( (id, delivery) <- acked ) {
             // only credit once...
             if( !delivery.credited ) {
+              delivery.credited = true;
               session_manager.delivered(delivery.session, delivery.size)
               ack_source.merge(1)
-              delivery.credited = true;
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1367524&r1=1367523&r2=1367524&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Jul 31 13:02:38 2012
@@ -789,6 +789,7 @@ class StompProtocolHandler extends Proto
         val addresses = consumer.addresses
         host.dispatch_queue {
           host.router.unbind(addresses, consumer, false , security_context)
+          consumer.release()
         }
       }
       consumers = Map()
@@ -1358,11 +1359,11 @@ class StompProtocolHandler extends Proto
 
     host.dispatch_queue {
       val rc = host.router.bind(addresses, consumer, security_context)
-      consumer.release
       dispatchQueue {
         rc match {
           case Some(reason)=>
             consumers -= id
+            consumer.release
             async_die(reason)
           case None =>
             send_receipt(headers)
@@ -1414,6 +1415,7 @@ class StompProtocolHandler extends Proto
         consumers -= id
         host.dispatch_queue {
           host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+          consumer.release()
           send_receipt(headers)
         }
     }