You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/04/11 19:18:43 UTC

svn commit: r1466981 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/ apollo-ope...

Author: chirino
Date: Thu Apr 11 17:18:39 2013
New Revision: 1466981

URL: http://svn.apache.org/r1466981
Log:
Fixes APLO-318: Large transactions sending persistent messages hang.

The queue's max inbound memory buffer was limiting how large the transaction could be.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -795,14 +795,13 @@ class AmqpProtocolHandler extends Protoc
 
     override def dispatch_queue = queue
 
-    val producer_overflow = new OverflowSink[Delivery](this) {
-      /**
-       * Called for each value what is passed on to the down stream sink.
-       */
-      override protected def onDelivered(value: Delivery) {
-        receiver.flow(1)
-        pump_out
-      }
+
+    /**
+     * Called for each value what is passed on to the down stream sink.
+     */
+    override protected def onDelivered(value: Delivery) {
+      receiver.flow(1)
+      pump_out
     }
 
     def onMessage(receiver:Receiver, delivery: DeliveryImpl, m: AmqpMessage) = {
@@ -861,7 +860,7 @@ class AmqpProtocolHandler extends Protoc
             case Some(tx) =>
               tx.add((uow)=>{
                 d.uow = uow
-                val accepted = producer_overflow.offer(d)
+                val accepted = this.offer(d)
                 assert(accepted)
               })
             case None =>
@@ -869,7 +868,7 @@ class AmqpProtocolHandler extends Protoc
           }
           receiver.advance();
         case _ =>
-          val accepted = producer_overflow.offer(d)
+          val accepted = this.offer(d)
           assert(accepted)
           receiver.advance();
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Apr 11 17:18:39 2013
@@ -63,7 +63,7 @@ case class GroupBucket(sub:Subscription)
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with Dispatched with SecuredResource {
+class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) extends BaseRetained with BindableDeliveryProducer with DeliveryConsumer with BaseService with DomainDestination with DeferringDispatched with SecuredResource {
   override def toString = binding.toString
 
   def virtual_host = router.virtual_host
@@ -716,8 +716,21 @@ class Queue(val router: LocalRouter, val
           assert(delivery.uow !=null)
           val uow = delivery.uow
           entry.state match {
-            case state:entry.Loaded => state.store_enqueue(uow)
-            case state:entry.Swapped => uow.enqueue(entry.toQueueEntryRecord)
+            case state:entry.Loaded =>
+              // Little hack to expand the producer memory window for persistent
+              // messages until the uow completes.  Sender might be sending a very
+              // larger UOW which does not fit in the window and then the UOW does
+              // not finish.
+              producer_swapped_in.size_max += delivery.size
+              uow.on_flush { canceled =>
+                defer {
+                  producer_swapped_in.size_max -= delivery.size
+                }
+              }
+
+              state.store_enqueue(uow)
+            case state:entry.Swapped =>
+              uow.enqueue(entry.toQueueEntryRecord)
           }
           uow
         } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu Apr 11 17:18:39 2013
@@ -201,10 +201,11 @@ object DeliveryProducerRoute extends Log
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer with DeferringDispatched {
+abstract class DeliveryProducerRoute(router:Router) extends AbstractOverflowSink[Delivery] with BindableDeliveryProducer with DeferringDispatched {
   import DeliveryProducerRoute._
 
   var last_send = Broker.now
+
   val reained_base = new BaseRetained
   def release = reained_base.release
   def retain = reained_base.retain
@@ -220,12 +221,12 @@ abstract class DeliveryProducerRoute(rou
 
   def connected() = defer {
     is_connected = true
-    if( overflow!=null ) {
-      val t = overflow
-      overflow = null
+    if( dispatch_delivery!=null ) {
+      val t = dispatch_delivery
+      dispatch_delivery = null
       _offer(t)
-      if( refiller!=null && !full ) {
-        refiller.run()
+      if( downstream.refiller!=null && !full ) {
+        downstream.refiller.run()
       }
     }
     on_connected
@@ -251,9 +252,9 @@ abstract class DeliveryProducerRoute(rou
       val rc = targets.contains(x.consumer)
       if( rc ) {
         debug("producer route detaching from consumer.")
-        if( !overflowSessions.isEmpty ) {
-          overflowSessions = overflowSessions.filterNot( _ == x )
-          if( overflowSessions.isEmpty ) {
+        if( !dispatch_sessions.isEmpty ) {
+          dispatch_sessions = dispatch_sessions.filterNot( _ == x )
+          if( dispatch_sessions.isEmpty ) {
             drainer.run
           }
         }
@@ -281,30 +282,37 @@ abstract class DeliveryProducerRoute(rou
   // when one of the down stream sinks cannot accept the offered
   // Dispatch.
   //
+  var dispatch_delivery:Delivery=null
+  var dispatch_sessions = List[DeliverySession]()
 
-  var overflow:Delivery=null
-  var overflowSessions = List[DeliverySession]()
-  var refiller:Task=null
-
-  def full = overflow!=null
+  // This the sink that the overflow goes to.
 
-  def offer(delivery:Delivery):Boolean = {
-    dispatch_queue.assertExecuting()
-    if( full ) {
-      false
-    } else {
-      if (delivery.uow != null) {
-        delivery.uow.retain
-      }
-      if ( !is_connected ) {
-        overflow = delivery
+  object downstream extends Sink[Delivery] {
+    var refiller:Task=null
+    def full = dispatch_delivery!=null
+
+    def offer(delivery:Delivery):Boolean = {
+      if( full ) {
+        false
       } else {
-        _offer(delivery)
+        if ( !is_connected ) {
+          dispatch_delivery = delivery
+        } else {
+          _offer(delivery)
+        }
+        return true
       }
-      return true
     }
   }
 
+  override def offer(delivery: Delivery): Boolean = {
+    dispatch_queue.assertExecuting()
+    if (delivery.uow != null) {
+      delivery.uow.retain
+    }
+    super.offer(delivery)
+  }
+
   private def _offer(delivery:Delivery):Boolean = {
     last_send = Broker.now
 
@@ -356,7 +364,7 @@ abstract class DeliveryProducerRoute(rou
         }
 
         if( !target.offer(copy) ) {
-          overflowSessions ::= target
+          dispatch_sessions ::= target
         }
       }
     }
@@ -365,8 +373,8 @@ abstract class DeliveryProducerRoute(rou
       original_ack(Consumed, null)
     }
 
-    if( overflowSessions!=Nil ) {
-      overflow = copy
+    if( dispatch_sessions!=Nil ) {
+      dispatch_delivery = copy
     } else {
       release(copy)
     }
@@ -384,23 +392,24 @@ abstract class DeliveryProducerRoute(rou
   }
 
   val drainer = ^{
+    dispatch_queue.assertExecuting()
     if( is_connected ) {
-      if( overflow!=null ) {
-        val original = overflowSessions;
-        overflowSessions = Nil
+      if( dispatch_delivery!=null ) {
+        val original = dispatch_sessions;
+        dispatch_sessions = Nil
         original.foreach { target=>
-          if( !target.offer(overflow) ) {
-            overflowSessions ::= target
+          if( !target.offer(dispatch_delivery) ) {
+            dispatch_sessions ::= target
           }
         }
-        if( overflowSessions==Nil ) {
-          release(overflow)
-          overflow = null
-          if(refiller!=null)
-            refiller.run
+        if( dispatch_sessions==Nil ) {
+          release(dispatch_delivery)
+          dispatch_delivery = null
+          if(downstream.refiller!=null)
+            downstream.refiller.run
         }
-      } else if(refiller!=null) {
-        refiller.run
+      } else if(downstream.refiller!=null) {
+        downstream.refiller.run
       }
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Apr 11 17:18:39 2013
@@ -161,7 +161,11 @@ class TransportSink(val transport:Transp
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
+class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T]
+
+abstract class AbstractOverflowSink[T] extends Sink[T] {
+
+  def downstream:Sink[T]
 
   var refiller:Task = NOOP
 

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java Thu Apr 11 17:18:39 2013
@@ -562,7 +562,7 @@ public class MqttSession {
             if (route.full()) {
                 // but once it gets full.. suspend to flow control the producer.
                 route.suspended = true;
-                handler._suspend_read("blocked sending to: " + route.overflowSessions().mkString(", "));
+                handler._suspend_read("blocked sending to: " + route.dispatch_sessions().mkString(", "));
             }
 
         } else {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Thu Apr 11 17:18:39 2013
@@ -62,13 +62,6 @@ class DestinationAdvisoryRouterListener(
 
 
   class ProducerRoute extends DeliveryProducerRoute(router) {
-    val sink_switcher = new MutableSink[Delivery]
-    val overflow_sink = new OverflowSink(sink_switcher)
-
-    override protected def on_connected = {
-      sink_switcher.downstream = Some(this)
-    }
-
     override def dispatch_queue = router.virtual_host.dispatch_queue
   }
 
@@ -108,23 +101,11 @@ class DestinationAdvisoryRouterListener(
       // replay the destination advisories..
       enabled = true
       if( !destination_advisories.isEmpty ) {
-        val producer = new ProducerRoute {
-          override def on_connected = {
-            overflow_sink.refiller = ^{
-              // once the sink is not overflowed.. then we can disconnect
-              if(!overflow_sink.overflowed) {
-                unbind(consumer::Nil)
-                overflow_sink.refiller = NOOP
-              }
-            }
-            overflow_sink.refiller.run()
-            super.on_connected
-          }
-        }
+        val producer = new ProducerRoute
         producer.bind(consumer::Nil, ()=>{})
         producer.connected()
         for( info <- destination_advisories.values ) {
-          producer.overflow_sink.offer(info)
+          producer.offer(info)
         }
       }
     }
@@ -195,7 +176,7 @@ class DestinationAdvisoryRouterListener(
 
         case route => route
       }
-      route.overflow_sink.offer(delivery)
+      route.offer(delivery)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -646,11 +646,12 @@ class OpenwireProtocolHandler extends Pr
 
   def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
 
-    producerRoutes.get(msg.getDestination) match {
+    val route = producerRoutes.get(msg.getDestination) match {
       case null =>
         // create the producer route...
         val addresses = to_destination_dto(msg.getDestination, this)
         val route = OpenwireDeliveryProducerRoute(addresses)
+        producerRoutes.put(msg.getDestination, route)
 
         if( uow!=null ) {
           uow.retain
@@ -663,66 +664,56 @@ class OpenwireProtocolHandler extends Pr
             resume_read
             rc match {
               case Some(failure) =>
+                producerRoutes.remove(msg.getDestination)
+                if( route.suspended ) {
+                  route.suspended = false
+                  resume_read()
+                }
                 fail(failure, msg)
               case None =>
-                if (!connection.stopped) {
-                  producerRoutes.put(msg.getDestination, route)
-                  send_via_route(route, msg, uow)
-                }
             }
             if( uow!=null ) {
               uow.release
             }
           }
         }
+        route
 
-      case route =>
-        // we can re-use the existing producer route
-        send_via_route(route, msg, uow)
-
+      case route => route
     }
+    send_via_route(route, msg, uow)
   }
 
   def send_via_route(route:OpenwireDeliveryProducerRoute, message:ActiveMQMessage, uow:StoreUOW) = {
-    if( !route.targets.isEmpty ) {
-
-      // We may need to add some headers..
-      val delivery = new Delivery
-      delivery.message = new OpenwireMessage(message)
-      delivery.expiration = message.getExpiration
-      delivery.persistent = message.isPersistent
-      delivery.size = {
-        val rc = message.getEncodedSize
-        if( rc != 0 )
-          rc
-        else
-          message.getSize
-      }
-      delivery.uow = uow
-
-      if( message.isResponseRequired ) {
-        delivery.ack = { (consumed, uow) =>
-          dispatchQueue <<| ^{
-            ack(message)
-          }
+    // We may need to add some headers..
+    val delivery = new Delivery
+    delivery.message = new OpenwireMessage(message)
+    delivery.expiration = message.getExpiration
+    delivery.persistent = message.isPersistent
+    delivery.size = {
+      val rc = message.getEncodedSize
+      if( rc != 0 )
+        rc
+      else
+        message.getSize
+    }
+    delivery.uow = uow
+
+    if( message.isResponseRequired ) {
+      delivery.ack = { (consumed, uow) =>
+        dispatchQueue <<| ^{
+          ack(message)
         }
       }
+    }
 
-      // routes can always accept at least 1 delivery...
-      assert( !route.full )
-      route.offer(delivery)
-      if( route.full ) {
-        // but once it gets full.. suspend, so that we get more messages
-        // until it's not full anymore.
-        route.suspended = true
-        suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
-      }
-
-    } else {
-      // info("Dropping message.  No consumers interested in message.")
-      ack(message)
+    route.offer(delivery)
+    if( route.full && !route.suspended) {
+      // but once it gets full.. suspend, so that we get more messages
+      // until it's not full anymore.
+      route.suspended = true
+      suspend_read("blocked destination: "+route.dispatch_sessions.mkString(", "))
     }
-    //    message.release
   }
 
   def on_message_ack(info:MessageAck) = {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala Thu Apr 11 17:18:39 2013
@@ -74,6 +74,13 @@ class OpenwireTestSupport extends Broker
     }
     connection
   }
+  def disconnect(connection:Connection=default_connection) = {
+    connection.close()
+    if (connection == default_connection) {
+      default_connection = null
+    }
+    connections = connections.filterNot(_ == connection)
+  }
 
   def receive_text(consumer:MessageConsumer) = consumer.receive().asInstanceOf[TextMessage].getText
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Thu Apr 11 17:18:39 2013
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.test
 
-import javax.jms.{TextMessage, Message, MessageListener, Session}
+import javax.jms._
 
 
 /**
@@ -94,10 +94,38 @@ class TransactionTest extends OpenwireTe
     m3.getText should equal(messages(2).getText)
 
   }
-
 }
 
 class OpenwireLevelDBTransactionTest extends TransactionTest {
   override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
 
+  test("Large Transaction Test"){
+//    for( i <- 1 to 1000 ) {
+    connect()
+    val dest = queue(next_id("example"))
+    val message_count = 100
+    val producer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
+    val producer = producer_session.createProducer(dest)
+    producer.setDeliveryMode(DeliveryMode.PERSISTENT)
+
+    for( i <- 1 to message_count) {
+      val x = producer_session.createTextMessage("x" * (1024*64))
+      x.setIntProperty("i", i)
+      producer.send(x)
+    }
+
+    // commit so consumer can see it
+    producer_session.commit()
+
+    val consumer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val consumer = consumer_session.createConsumer(dest)
+
+    for( i <- 1 to message_count) {
+      val m = consumer.receive(1000).asInstanceOf[TextMessage]
+      m should not be (null)
+      m.getIntProperty("i") should be (i)
+    }
+//    disconnect() }
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Apr 11 17:18:39 2013
@@ -849,7 +849,6 @@ class StompProtocolHandler extends Proto
     connection_sink.refiller =  ^ {
       if( connection_sink_read_suspended ) {
         connection_sink_read_suspended = false
-        println("connection_sink: resume_read")
         resume_read()
       }
     }
@@ -1399,54 +1398,43 @@ class StompProtocolHandler extends Proto
     // User might be asking for ack that we have processed the message..
     val receipt = frame.header(RECEIPT_REQUESTED)
 
-    if( !route.targets.isEmpty ) {
-
-      // We may need to add some headers..
-      var message = updated_headers(addresses, frame.headers) match {
-        case Nil=>
-          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
-        case updated_headers =>
-          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
-      }
-
-      val delivery = new Delivery
-      delivery.message = message
-      delivery.expiration = message.expiration
-      delivery.persistent = message.persistent
-      delivery.size = message.frame.size
-      delivery.uow = uow
-      get(frame.headers, RETAIN).foreach { retain =>
-        delivery.retain = retain match {
-          case SET => RetainSet
-          case REMOVE => RetainRemove
-          case _ => RetainIgnore
-        }
+    // We may need to add some headers..
+    var message = updated_headers(addresses, frame.headers) match {
+      case Nil=>
+        StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous))
+      case updated_headers =>
+        StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, frame.contiguous, updated_headers))
+    }
+
+    val delivery = new Delivery
+    delivery.message = message
+    delivery.expiration = message.expiration
+    delivery.persistent = message.persistent
+    delivery.size = message.frame.size
+    delivery.uow = uow
+    get(frame.headers, RETAIN).foreach { retain =>
+      delivery.retain = retain match {
+        case SET => RetainSet
+        case REMOVE => RetainRemove
+        case _ => RetainIgnore
       }
+    }
 
-      if( receipt!=null ) {
-        val trimmed_receipt = receipt.deepCopy().ascii()
-        delivery.ack = { (consumed, uow) =>
-          defer {
-            send_receipt(trimmed_receipt)
-          }
+    if( receipt!=null ) {
+      val trimmed_receipt = receipt.deepCopy().ascii()
+      delivery.ack = { (consumed, uow) =>
+        defer {
+          send_receipt(trimmed_receipt)
         }
       }
+    }
 
-      // routes can always accept at least 1 delivery...
-      assert( !route.full )
-      route.offer(delivery)
-      if( route.full ) {
-        // but once it gets full.. suspend, so that we get more stomp messages
-        // until it's not full anymore.
-        route.suspended = true
-        suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
-      }
-
-    } else {
-      // info("Dropping message.  No consumers interested in message.")
-      if( receipt!=null ) {
-        send_receipt(receipt)
-      }
+    route.offer(delivery)
+    if( route.full && !route.suspended ) {
+      // but once it gets full.. suspend, so that we get more stomp messages
+      // until it's not full anymore.
+      route.suspended = true
+      suspend_read("blocked sending to: "+route.dispatch_sessions.mkString(", "))
     }
     frame.release
   }
@@ -1718,7 +1706,6 @@ class StompProtocolHandler extends Proto
     connection_sink.offer(frame)
     if( connection_sink.overflow.size() > 1000 && !connection_sink_read_suspended) {
       connection_sink_read_suspended = true
-      println("connection_sink: suspend_read")
       suspend_read("client to drain receipts")
     }
     frame

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1466981&r1=1466980&r2=1466981&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Thu Apr 11 17:18:39 2013
@@ -183,7 +183,7 @@ class StompParallelTest extends StompTes
 
     val udp_port: Int = connector_port("stomp-udp").get
     val channel = DatagramChannel.open();
-    println("The UDP port is: "+udp_port)
+    info("The UDP port is: "+udp_port)
 
     val target = new InetSocketAddress("127.0.0.1", udp_port)
     channel.send(new AsciiBuffer(
@@ -623,7 +623,7 @@ class StompParallelTest extends StompTes
     send_receive
 
     var expected_mapping = actual_mapping
-    println(expected_mapping)
+    info(expected_mapping.toString())
     expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should be(true)
 
     actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
@@ -638,7 +638,7 @@ class StompParallelTest extends StompTes
     actual_mapping = mutable.HashMap[String, mutable.HashSet[Char]]()
     send_receive
     expected_mapping = actual_mapping
-    println(expected_mapping)
+    info(expected_mapping.toString())
 
     expected_mapping.get("1").get.intersect(expected_mapping.get("2").get).isEmpty should be(true)
     expected_mapping.get("2").get.intersect(expected_mapping.get("3").get).isEmpty should be(true)
@@ -1596,7 +1596,7 @@ class StompParallelTest extends StompTes
       }
 
       val expired = (msg_count-received)
-      println("expired: "+expired)
+      info("expired: "+expired)
       expired should not be(0)
 
     }
@@ -1615,7 +1615,7 @@ class StompParallelTest extends StompTes
     val body = "x"*1024*10
     Broker.BLOCKABLE_THREAD_POOL {
       for( i <- 1 to 10 ) {
-        println("sending: "+i)
+        info("sending: "+i)
         val client = connect("1.1", new StompClient)
         async_send(dest, body, c=client)
         disconnect(client)