You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/31 15:02:38 UTC
svn commit: r1367524 - in /activemq/activemq-apollo/trunk:
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Author: chirino
Date: Tue Jul 31 13:02:38 2012
New Revision: 1367524
URL: http://svn.apache.org/viewvc?rev=1367524&view=rev
Log:
Don't release the stomp or openwire consumers until they are closed by the client.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1367524&r1=1367523&r2=1367524&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Jul 31 13:02:38 2012
@@ -781,16 +781,24 @@ class OpenwireProtocolHandler extends Pr
class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends BaseRetained with DeliveryConsumer {
-// The following comes in handy if we need to debug the
-// reference counts of the consumers.
-// val r = new BaseRetained
+//// The following comes in handy if we need to debug the
+//// reference counts of the consumers.
+// class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends Retained with DeliveryConsumer {
+// val r = new BaseRetained {
+// override def toString: String = info.getConsumerId.toString
+// }
//
-// def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
-// def retained: Int =r.retained
+// var d = NOOP
+// def setDisposer(p1: Task): Unit = d = p1
+// r.setDisposer(^{
+// dispose();
+// d.run();
+// })
+// def retained: Int = r.retained
//
-// def printST(name:String) = {
+// def printST(name:String) = System.out.synchronized {
// val e = new Exception
-// println(name+": "+connection.map(_.id))
+// println(name+": "+info.getConsumerId+" @ "+r.retained())
// println(" "+e.getStackTrace.drop(1).take(4).mkString("\n "))
// }
//
@@ -895,7 +903,6 @@ class OpenwireProtocolHandler extends Pr
host.dispatch_queue {
val rc = host.router.bind(addresses, this, security_context)
- this.release
dispatchQueue {
rc match {
case None =>
@@ -910,6 +917,7 @@ class OpenwireProtocolHandler extends Pr
def dettach = {
host.dispatch_queue {
host.router.unbind(addresses, this, false , security_context)
+ this.release
}
parent.consumers.remove(info.getConsumerId)
all_consumers.remove(info.getConsumerId)
@@ -942,7 +950,7 @@ class OpenwireProtocolHandler extends Pr
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE)
var closed = false
def consumer = ConsumerContext.this
@@ -1059,9 +1067,9 @@ class OpenwireProtocolHandler extends Pr
if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
for( (id, delivery) <- consumer_acks.find(_._1 == msgid) ) {
if ( !delivery.credited ) {
+ delivery.credited = true;
session_manager.delivered(delivery.session, delivery.size)
ack_source.merge(1)
- delivery.credited = true;
}
}
} else {
@@ -1078,9 +1086,9 @@ class OpenwireProtocolHandler extends Pr
for( (id, delivery) <- acked ) {
// only credit once...
if( !delivery.credited ) {
+ delivery.credited = true;
session_manager.delivered(delivery.session, delivery.size)
ack_source.merge(1)
- delivery.credited = true;
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1367524&r1=1367523&r2=1367524&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Jul 31 13:02:38 2012
@@ -789,6 +789,7 @@ class StompProtocolHandler extends Proto
val addresses = consumer.addresses
host.dispatch_queue {
host.router.unbind(addresses, consumer, false , security_context)
+ consumer.release()
}
}
consumers = Map()
@@ -1358,11 +1359,11 @@ class StompProtocolHandler extends Proto
host.dispatch_queue {
val rc = host.router.bind(addresses, consumer, security_context)
- consumer.release
dispatchQueue {
rc match {
case Some(reason)=>
consumers -= id
+ consumer.release
async_die(reason)
case None =>
send_receipt(headers)
@@ -1414,6 +1415,7 @@ class StompProtocolHandler extends Proto
consumers -= id
host.dispatch_queue {
host.router.unbind(consumer.addresses, consumer, persistent, security_context)
+ consumer.release()
send_receipt(headers)
}
}