You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/11 19:18:43 UTC
svn commit: r1466981 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/ apollo-ope...
Author: chirino
Date: Thu Apr 11 17:18:39 2013
New Revision: 1466981
URL: http://svn.apache.org/r1466981
Log:
Fixes APLO-318: Large transactions sending persistent messages hang.
The queue's max inbound memory buffer was limiting how large the transaction could be.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -795,14 +795,13 @@ class AmqpProtocolHandler extends Protoc
override def dispatch_queue = queue
- val producer_overflow = new OverflowSink[Delivery](this) {
- /**
- * Called for each value what is passed on to the down stream sink.
- */
- override protected def onDelivered(value: Delivery) {
- receiver.flow(1)
- pump_out
- }
+
+ /**
+ * Called for each value what is passed on to the down stream sink.
+ */
+ override protected def onDelivered(value: Delivery) {
+ receiver.flow(1)
+ pump_out
}
def onMessage(receiver:Receiver, delivery: DeliveryImpl, m: AmqpMessage) = {
@@ -861,7 +860,7 @@ class AmqpProtocolHandler extends Protoc
case Some(tx) =>
tx.add((uow)=>{
d.uow = uow
- val accepted = producer_overflow.offer(d)
+ val accepted = this.offer(d)
assert(accepted)
})
case None =>
@@ -869,7 +868,7 @@ class AmqpProtocolHandler extends Protoc
}
receiver.advance();
case _ =>
- val accepted = producer_overflow.offer(d)
+ val accepted = this.offer(d)
assert(accepted)
receiver.advance();
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Apr 11 17:18:39 2013
@@ -63,7 +63,7 @@ case class GroupBucket(sub:Subscription)
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with Dispatched with SecuredResource {
+class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with DeferringDispatched with SecuredResource {
override def toString = binding.toString
def virtual_host = router.virtual_host
@@ -716,8 +716,21 @@ class Queue(val router: LocalRouter, val
assert(delivery.uow !=null)
val uow = delivery.uow
entry.state match {
- case state:entry.Loaded => state.store_enqueue(uow)
- case state:entry.Swapped => uow.enqueue(entry.toQueueEntryRecord)
+ case state:entry.Loaded =>
+ // Little hack to expand the producer memory window for persistent
+ // messages until the uow completes. Sender might be sending a very
+ // larger UOW which does not fit in the window and then the UOW does
+ // not finish.
+ producer_swapped_in.size_max += delivery.size
+ uow.on_flush { canceled =>
+ defer {
+ producer_swapped_in.size_max -= delivery.size
+ }
+ }
+
+ state.store_enqueue(uow)
+ case state:entry.Swapped =>
+ uow.enqueue(entry.toQueueEntryRecord)
}
uow
} else {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu Apr 11 17:18:39 2013
@@ -201,10 +201,11 @@ object DeliveryProducerRoute extends Log
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer with DeferringDispatched {
+abstract class DeliveryProducerRoute(router:Router) extends AbstractOverflowSink[Delivery] with BindableDeliveryProducer with DeferringDispatched {
import DeliveryProducerRoute._
var last_send = Broker.now
+
val reained_base = new BaseRetained
def release = reained_base.release
def retain = reained_base.retain
@@ -220,12 +221,12 @@ abstract class DeliveryProducerRoute(rou
def connected() = defer {
is_connected = true
- if( overflow!=null ) {
- val t = overflow
- overflow = null
+ if( dispatch_delivery!=null ) {
+ val t = dispatch_delivery
+ dispatch_delivery = null
_offer(t)
- if( refiller!=null && !full ) {
- refiller.run()
+ if( downstream.refiller!=null && !full ) {
+ downstream.refiller.run()
}
}
on_connected
@@ -251,9 +252,9 @@ abstract class DeliveryProducerRoute(rou
val rc = targets.contains(x.consumer)
if( rc ) {
debug("producer route detaching from consumer.")
- if( !overflowSessions.isEmpty ) {
- overflowSessions = overflowSessions.filterNot( _ == x )
- if( overflowSessions.isEmpty ) {
+ if( !dispatch_sessions.isEmpty ) {
+ dispatch_sessions = dispatch_sessions.filterNot( _ == x )
+ if( dispatch_sessions.isEmpty ) {
drainer.run
}
}
@@ -281,30 +282,37 @@ abstract class DeliveryProducerRoute(rou
// when one of the down stream sinks cannot accept the offered
// Dispatch.
//
+ var dispatch_delivery:Delivery=null
+ var dispatch_sessions = List[DeliverySession]()
- var overflow:Delivery=null
- var overflowSessions = List[DeliverySession]()
- var refiller:Task=null
-
- def full = overflow!=null
+ // This the sink that the overflow goes to.
- def offer(delivery:Delivery):Boolean = {
- dispatch_queue.assertExecuting()
- if( full ) {
- false
- } else {
- if (delivery.uow != null) {
- delivery.uow.retain
- }
- if ( !is_connected ) {
- overflow = delivery
+ object downstream extends Sink[Delivery] {
+ var refiller:Task=null
+ def full = dispatch_delivery!=null
+
+ def offer(delivery:Delivery):Boolean = {
+ if( full ) {
+ false
} else {
- _offer(delivery)
+ if ( !is_connected ) {
+ dispatch_delivery = delivery
+ } else {
+ _offer(delivery)
+ }
+ return true
}
- return true
}
}
+ override def offer(delivery: Delivery): Boolean = {
+ dispatch_queue.assertExecuting()
+ if (delivery.uow != null) {
+ delivery.uow.retain
+ }
+ super.offer(delivery)
+ }
+
private def _offer(delivery:Delivery):Boolean = {
last_send = Broker.now
@@ -356,7 +364,7 @@ abstract class DeliveryProducerRoute(rou
}
if( !target.offer(copy) ) {
- overflowSessions ::= target
+ dispatch_sessions ::= target
}
}
}
@@ -365,8 +373,8 @@ abstract class DeliveryProducerRoute(rou
original_ack(Consumed, null)
}
- if( overflowSessions!=Nil ) {
- overflow = copy
+ if( dispatch_sessions!=Nil ) {
+ dispatch_delivery = copy
} else {
release(copy)
}
@@ -384,23 +392,24 @@ abstract class DeliveryProducerRoute(rou
}
val drainer = ^{
+ dispatch_queue.assertExecuting()
if( is_connected ) {
- if( overflow!=null ) {
- val original = overflowSessions;
- overflowSessions = Nil
+ if( dispatch_delivery!=null ) {
+ val original = dispatch_sessions;
+ dispatch_sessions = Nil
original.foreach { target=>
- if( !target.offer(overflow) ) {
- overflowSessions ::= target
+ if( !target.offer(dispatch_delivery) ) {
+ dispatch_sessions ::= target
}
}
- if( overflowSessions==Nil ) {
- release(overflow)
- overflow = null
- if(refiller!=null)
- refiller.run
+ if( dispatch_sessions==Nil ) {
+ release(dispatch_delivery)
+ dispatch_delivery = null
+ if(downstream.refiller!=null)
+ downstream.refiller.run
}
- } else if(refiller!=null) {
- refiller.run
+ } else if(downstream.refiller!=null) {
+ downstream.refiller.run
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Apr 11 17:18:39 2013
@@ -161,7 +161,11 @@ class TransportSink(val transport:Transp
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
+class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T]
+
+abstract class AbstractOverflowSink[T] extends Sink[T] {
+
+ def downstream:Sink[T]
var refiller:Task = NOOP
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java Thu Apr 11 17:18:39 2013
@@ -562,7 +562,7 @@ public class MqttSession {
if (route.full()) {
// but once it gets full.. suspend to flow control the producer.
route.suspended = true;
- handler._suspend_read("blocked sending to: " + route.overflowSessions().mkString(", "));
+ handler._suspend_read("blocked sending to: " + route.dispatch_sessions().mkString(", "));
}
} else {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Thu Apr 11 17:18:39 2013
@@ -62,13 +62,6 @@ class DestinationAdvisoryRouterListener(
class ProducerRoute extends DeliveryProducerRoute(router) {
- val sink_switcher = new MutableSink[Delivery]
- val overflow_sink = new OverflowSink(sink_switcher)
-
- override protected def on_connected = {
- sink_switcher.downstream = Some(this)
- }
-
override def dispatch_queue = router.virtual_host.dispatch_queue
}
@@ -108,23 +101,11 @@ class DestinationAdvisoryRouterListener(
// replay the destination advisories..
enabled = true
if( !destination_advisories.isEmpty ) {
- val producer = new ProducerRoute {
- override def on_connected = {
- overflow_sink.refiller = ^{
- // once the sink is not overflowed.. then we can disconnect
- if(!overflow_sink.overflowed) {
- unbind(consumer::Nil)
- overflow_sink.refiller = NOOP
- }
- }
- overflow_sink.refiller.run()
- super.on_connected
- }
- }
+ val producer = new ProducerRoute
producer.bind(consumer::Nil, ()=>{})
producer.connected()
for( info <- destination_advisories.values ) {
- producer.overflow_sink.offer(info)
+ producer.offer(info)
}
}
}
@@ -195,7 +176,7 @@ class DestinationAdvisoryRouterListener(
case route => route
}
- route.overflow_sink.offer(delivery)
+ route.offer(delivery)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -646,11 +646,12 @@ class OpenwireProtocolHandler extends Pr
def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
- producerRoutes.get(msg.getDestination) match {
+ val route = producerRoutes.get(msg.getDestination) match {
case null =>
// create the producer route...
val addresses = to_destination_dto(msg.getDestination, this)
val route = OpenwireDeliveryProducerRoute(addresses)
+ producerRoutes.put(msg.getDestination, route)
if( uow!=null ) {
uow.retain
@@ -663,66 +664,56 @@ class OpenwireProtocolHandler extends Pr
resume_read
rc match {
case Some(failure) =>
+ producerRoutes.remove(msg.getDestination)
+ if( route.suspended ) {
+ route.suspended = false
+ resume_read()
+ }
fail(failure, msg)
case None =>
- if (!connection.stopped) {
- producerRoutes.put(msg.getDestination, route)
- send_via_route(route, msg, uow)
- }
}
if( uow!=null ) {
uow.release
}
}
}
+ route
- case route =>
- // we can re-use the existing producer route
- send_via_route(route, msg, uow)
-
+ case route => route
}
+ send_via_route(route, msg, uow)
}
def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
- if( !route.targets.isEmpty ) {
-
- // We may need to add some headers..
- val delivery = new Delivery
- delivery.message = new OpenwireMessage(message)
- delivery.expiration = message.getExpiration
- delivery.persistent = message.isPersistent
- delivery.size = {
- val rc = message.getEncodedSize
- if( rc != 0 )
- rc
- else
- message.getSize
- }
- delivery.uow = uow
-
- if( message.isResponseRequired ) {
- delivery.ack = { (consumed, uow) =>
- dispatchQueue <<| ^{
- ack(message)
- }
+ // We may need to add some headers..
+ val delivery = new Delivery
+ delivery.message = new OpenwireMessage(message)
+ delivery.expiration = message.getExpiration
+ delivery.persistent = message.isPersistent
+ delivery.size = {
+ val rc = message.getEncodedSize
+ if( rc != 0 )
+ rc
+ else
+ message.getSize
+ }
+ delivery.uow = uow
+
+ if( message.isResponseRequired ) {
+ delivery.ack = { (consumed, uow) =>
+ dispatchQueue <<| ^{
+ ack(message)
}
}
+ }
- // routes can always accept at least 1 delivery...
- assert( !route.full )
- route.offer(delivery)
- if( route.full ) {
- // but once it gets full.. suspend, so that we get more messages
- // until it's not full anymore.
- route.suspended = true
- suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
- }
-
- } else {
- // info("Dropping message. No consumers interested in message.")
- ack(message)
+ route.offer(delivery)
+ if( route.full && !route.suspended) {
+ // but once it gets full.. suspend, so that we get more messages
+ // until it's not full anymore.
+ route.suspended = true
+ suspend_read("blocked destination: "+route.dispatch_sessions.mkString(", "))
}
- // message.release
}
def on_message_ack(info:MessageAck) = {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala Thu Apr 11 17:18:39 2013
@@ -74,6 +74,13 @@ class OpenwireTestSupport extends Broker
}
connection
}
+ def disconnect(connection:Connection=default_connection) = {
+ connection.close()
+ if (connection == default_connection) {
+ default_connection = null
+ }
+ connections = connections.filterNot(_ == connection)
+ }
def receive_text(consumer:MessageConsumer) = consumer.receive().asInstanceOf[TextMessage].getText
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Thu Apr 11 17:18:39 2013
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.openwire.test
-import javax.jms.{TextMessage, Message, MessageListener, Session}
+import javax.jms._
/**
@@ -94,10 +94,38 @@ class TransactionTest extends OpenwireTe
m3.getText should equal(messages(2).getText)
}
-
}
class OpenwireLevelDBTransactionTest extends TransactionTest {
override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
+ test("Large Transaction Test"){
+// for( i <- 1 to 1000 ) {
+ connect()
+ val dest = queue(next_id("example"))
+ val message_count = 100
+ val producer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+ val producer = producer_session.createProducer(dest)
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT)
+
+ for( i <- 1 to message_count) {
+ val x = producer_session.createTextMessage("x" * (1024*64))
+ x.setIntProperty("i", i)
+ producer.send(x)
+ }
+
+ // commit so consumer can see it
+ producer_session.commit()
+
+ val consumer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val consumer = consumer_session.createConsumer(dest)
+
+ for( i <- 1 to message_count) {
+ val m = consumer.receive(1000).asInstanceOf[TextMessage]
+ m should not be (null)
+ m.getIntProperty("i") should be (i)
+ }
+// disconnect() }
+ }
+
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -849,7 +849,6 @@ class StompProtocolHandler extends Proto
connection_sink.refiller = ^ {
if( connection_sink_read_suspended ) {
connection_sink_read_suspended = false
- println("connection_sink: resume_read")
resume_read()
}
}
@@ -1399,54 +1398,43 @@ class StompProtocolHandler extends Proto
// User might be asking for ack that we have processed the message..
val receipt = frame.header(RECEIPT_REQUESTED)
- if( !route.targets.isEmpty ) {
-
- // We may need to add some headers..
- var message = updated_headers(addresses, frame.headers) match {
- case Nil=>
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
- case updated_headers =>
- StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
- }
-
- val delivery = new Delivery
- delivery.message = message
- delivery.expiration = message.expiration
- delivery.persistent = message.persistent
- delivery.size = message.frame.size
- delivery.uow = uow
- get(frame.headers, RETAIN).foreach { retain =>
- delivery.retain = retain match {
- case SET => RetainSet
- case REMOVE => RetainRemove
- case _ => RetainIgnore
- }
+ // We may need to add some headers..
+ var message = updated_headers(addresses, frame.headers) match {
+ case Nil=>
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
+ case updated_headers =>
+ StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
+ }
+
+ val delivery = new Delivery
+ delivery.message = message
+ delivery.expiration = message.expiration
+ delivery.persistent = message.persistent
+ delivery.size = message.frame.size
+ delivery.uow = uow
+ get(frame.headers, RETAIN).foreach { retain =>
+ delivery.retain = retain match {
+ case SET => RetainSet
+ case REMOVE => RetainRemove
+ case _ => RetainIgnore
}
+ }
- if( receipt!=null ) {
- val trimmed_receipt = receipt.deepCopy().ascii()
- delivery.ack = { (consumed, uow) =>
- defer {
- send_receipt(trimmed_receipt)
- }
+ if( receipt!=null ) {
+ val trimmed_receipt = receipt.deepCopy().ascii()
+ delivery.ack = { (consumed, uow) =>
+ defer {
+ send_receipt(trimmed_receipt)
}
}
+ }
- // routes can always accept at least 1 delivery...
- assert( !route.full )
- route.offer(delivery)
- if( route.full ) {
- // but once it gets full.. suspend, so that we get more stomp messages
- // until it's not full anymore.
- route.suspended = true
- suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
- }
-
- } else {
- // info("Dropping message. No consumers interested in message.")
- if( receipt!=null ) {
- send_receipt(receipt)
- }
+ route.offer(delivery)
+ if( route.full && !route.suspended ) {
+ // but once it gets full.. suspend, so that we get more stomp messages
+ // until it's not full anymore.
+ route.suspended = true
+ suspend_read("blocked sending to: "+route.dispatch_sessions.mkString(", "))
}
frame.release
}
@@ -1718,7 +1706,6 @@ class StompProtocolHandler extends Proto
connection_sink.offer(frame)
if( connection_sink.overflow.size() > 1000 && !connection_sink_read_suspended) {
connection_sink_read_suspended = true
- println("connection_sink: suspend_read")
suspend_read("client to drain receipts")
}
frame
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Thu Apr 11 17:18:39 2013
@@ -183,7 +183,7 @@ class StompParallelTest extends StompTes
val udp_port: Int = connector_port("stomp-udp").get
val channel = DatagramChannel.open();
- println("The UDP port is: "+udp_port)
+ info("The UDP port is: "+udp_port)
val target = new InetSocketAddress("127.0.0.1", udp_port)
channel.send(new AsciiBuffer(
@@ -623,7 +623,7 @@ class StompParallelTest extends StompTes
send_receive
var expected_mapping = actual_mapping
- println(expected_mapping)
+ info(expected_mapping.toString())
expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should be(true)
actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
@@ -638,7 +638,7 @@ class StompParallelTest extends StompTes
actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
send_receive
expected_mapping = actual_mapping
- println(expected_mapping)
+ info(expected_mapping.toString())
expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should be(true)
expected_mapping.get("2").get.intersect(expected_mapping.get("3").get).isEmpty should be(true)
@@ -1596,7 +1596,7 @@ class StompParallelTest extends StompTes
}
val expired = (msg_count-received)
- println("expired: "+expired)
+ info("expired: "+expired)
expired should not be(0)
}
@@ -1615,7 +1615,7 @@ class StompParallelTest extends StompTes
val body = "x"*1024*10
Broker.BLOCKABLE_THREAD_POOL {
for( i <- 1 to 10 ) {
- println("sending: "+i)
+ info("sending: "+i)
val client = connect("1.1", new StompClient)
async_send(dest, body, c=client)
disconnect(client)