You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/26 19:42:57 UTC

svn commit: r1390624 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/ test/scala/org/apache/activemq/apollo/stomp/test/

Author: chirino
Date: Wed Sep 26 17:42:57 2012
New Revision: 1390624

URL: http://svn.apache.org/viewvc?rev=1390624&view=rev
Log:
Fixes APLO-249: Message expiration does not (always) work on topics

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Sep 26 17:42:57 2012
@@ -483,48 +483,59 @@ class StompProtocolHandler extends Proto
     }
 
     val consumer_sink = sink_manager.open()
-    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map { event =>
+    val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.flatMap { event =>
       val (session, delivery) = event
-      val message = delivery.message
-      var frame = if( message.codec eq StompMessageCodec ) {
-        message.asInstanceOf[StompFrameMessage].frame
-      } else {
-        val (body, content_type) =  protocol_convert match{
-          case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
-          case _ => (message.encoded, "protocol/"+message.codec.id())
-        }
-        message_id_counter += 1
-        var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
-        headers ::= (CONTENT_TYPE -> ascii(content_type))
-        headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
-        headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
-        StompFrame(MESSAGE, headers, BufferContent(body))
-      }
 
-      val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
-        frame.header(MESSAGE_ID)
+      // perhaps it has expired.. no need to deliver.
+      if( delivery.expiration != 0 && delivery.expiration <= Broker.now ) {
+        session_manager.delivered(session, delivery.size)
+        if( delivery.ack != null ) {
+          delivery.ack(Expired, null)
+        }
+        None
       } else {
-        val ack_id = checkout_ack_id
-        // we need to add the ACK id.
-        frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
-        ack_id
-      }
+        val message = delivery.message
+        var frame = if( message.codec eq StompMessageCodec ) {
+          message.asInstanceOf[StompFrameMessage].frame
+        } else {
+          val (body, content_type) =  protocol_convert match{
+            case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+            case _ => (message.encoded, "protocol/"+message.codec.id())
+          }
+          message_id_counter += 1
+          var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
+          headers ::= (CONTENT_TYPE -> ascii(content_type))
+          headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
+          headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
+          StompFrame(MESSAGE, headers, BufferContent(body))
+        }
+
+        val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
+          frame.header(MESSAGE_ID)
+        } else {
+          val ack_id = checkout_ack_id
+          // we need to add the ACK id.
+          frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
+          ack_id
+        }
 
-      ack_handler.track(session, ack_id, delivery.size, delivery.ack)
+        ack_handler.track(session, ack_id, delivery.size, delivery.ack)
 
-      if( subscription_id != None ) {
-        frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
-      }
-      if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
-        val header = encode_header(config.add_redeliveries_header)
-        val value = ascii(delivery.redeliveries.toString())
-        frame = frame.append_headers((header, value)::Nil)
-      }
-      if( include_seq.isDefined ) {
-        frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+        if( subscription_id != None ) {
+          frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+        }
+        if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
+          val header = encode_header(config.add_redeliveries_header)
+          val value = ascii(delivery.redeliveries.toString())
+          frame = frame.append_headers((header, value)::Nil)
+        }
+        if( include_seq.isDefined ) {
+          frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+        }
+        messages_sent += 1
+        Some(frame)
       }
-      messages_sent += 1
-      frame
+
     }, SessionDeliverySizer)
 
     credit_window_filter.credit(initial_credit_window.count, initial_credit_window.size)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Sep 26 17:42:57 2012
@@ -22,6 +22,7 @@ import org.fusesource.hawtbuf.AsciiBuffe
 import org.apache.activemq.apollo.broker._
 import java.net.{SocketTimeoutException, InetSocketAddress}
 import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
+import org.fusesource.hawtdispatch._
 
 /**
  * These tests can be run in parallel against a single Apollo broker.
@@ -1542,4 +1543,45 @@ class StompParallelTest extends StompTes
     disconnect()
   }
 
+  for ( prefix<- List("queued", "block")) {
+    test("APLO-249: Message expiration does not (always) work on topics: "+prefix) {
+      val dest = next_id("/topic/"+prefix+".expiration")
+      val msg_count = 1000
+
+      connect("1.1")
+      subscribe("0", dest, "client")
+
+      Broker.BLOCKABLE_THREAD_POOL {
+        val sender = new StompClient
+        connect("1.1", sender)
+        val exp = System.currentTimeMillis()+500
+        for( i <- 1 to msg_count ) {
+          async_send(dest, "%01024d".format(i), "expires:"+exp+"\n", sender)
+        }
+        sync_send(dest, "DONE", c=sender)
+        close(sender)
+      }
+
+      var done = false
+      var received = 0
+      Thread.sleep(1000)
+      while( !done ) {
+        val (frame, ack) = receive_message()
+        val body = frame.substring(frame.indexOf("\n\n")+2)
+        if( body == "DONE" ) {
+          done = true
+        } else {
+          received +=1
+          ack(true)
+        }
+      }
+
+      val expired = (msg_count-received)
+      println("expired: "+expired)
+      expired should not be(0)
+
+    }
+
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Wed Sep 26 17:42:57 2012
@@ -129,18 +129,23 @@ class StompTestSupport extends BrokerFun
   }
 
   def assert_received(body: Any, sub: String = null, c: StompClient = client, txid: String = null): (Boolean) => Unit = {
-    val frame = c.receive()
-    frame should startWith("MESSAGE\n")
-    if (sub != null) {
-      frame should include("subscription:" + sub + "\n")
-    }
+    val (frame, ack) = receive_message(sub, c, txid)
     body match {
       case null =>
       case body: scala.util.matching.Regex => frame should endWith regex (body)
       case body => frame should endWith("\n\n" + body)
     }
+    ack
+  }
+
+  def receive_message(sub: String = null, c: StompClient = client, txid: String = null): (String, (Boolean) => Unit) = {
+    val frame = c.receive()
+    frame should startWith("MESSAGE\n")
+    if (sub != null) {
+      frame should include("subscription:" + sub + "\n")
+    }
     // return a func that can ack the message.
-    (ack: Boolean) => {
+    (frame, (ack: Boolean) => {
       if( c.version == "1.0" || c.version== "1.1" ) {
         val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
         val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
@@ -163,7 +168,7 @@ class StompTestSupport extends BrokerFun
 
                   "\n")
       }
-    }
+    })
   }
 
   def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false): Unit = {