You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/26 19:42:57 UTC
svn commit: r1390624 - in /activemq/activemq-apollo/trunk/apollo-stomp/src:
main/scala/org/apache/activemq/apollo/stomp/
test/scala/org/apache/activemq/apollo/stomp/test/
Author: chirino
Date: Wed Sep 26 17:42:57 2012
New Revision: 1390624
URL: http://svn.apache.org/viewvc?rev=1390624&view=rev
Log:
Fixes APLO-249: Message expiration does not (always) work on topics
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Sep 26 17:42:57 2012
@@ -483,48 +483,59 @@ class StompProtocolHandler extends Proto
}
val consumer_sink = sink_manager.open()
- val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map { event =>
+ val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.flatMap { event =>
val (session, delivery) = event
- val message = delivery.message
- var frame = if( message.codec eq StompMessageCodec ) {
- message.asInstanceOf[StompFrameMessage].frame
- } else {
- val (body, content_type) = protocol_convert match{
- case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
- case _ => (message.encoded, "protocol/"+message.codec.id())
- }
- message_id_counter += 1
- var headers = (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
- headers ::= (CONTENT_TYPE -> ascii(content_type))
- headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
- headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
- StompFrame(MESSAGE, headers, BufferContent(body))
- }
- val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
- frame.header(MESSAGE_ID)
+ // perhaps it has expired.. no need to deliver.
+ if( delivery.expiration != 0 && delivery.expiration <= Broker.now ) {
+ session_manager.delivered(session, delivery.size)
+ if( delivery.ack != null ) {
+ delivery.ack(Expired, null)
+ }
+ None
} else {
- val ack_id = checkout_ack_id
- // we need to add the ACK id.
- frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
- ack_id
- }
+ val message = delivery.message
+ var frame = if( message.codec eq StompMessageCodec ) {
+ message.asInstanceOf[StompFrameMessage].frame
+ } else {
+ val (body, content_type) = protocol_convert match{
+ case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
+ case _ => (message.encoded, "protocol/"+message.codec.id())
+ }
+ message_id_counter += 1
+ var headers = (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
+ headers ::= (CONTENT_TYPE -> ascii(content_type))
+ headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
+ headers ::= (DESTINATION -> encode_header(destination_parser.encode_destination(delivery.sender.tail)))
+ StompFrame(MESSAGE, headers, BufferContent(body))
+ }
+
+ val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
+ frame.header(MESSAGE_ID)
+ } else {
+ val ack_id = checkout_ack_id
+ // we need to add the ACK id.
+ frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
+ ack_id
+ }
- ack_handler.track(session, ack_id, delivery.size, delivery.ack)
+ ack_handler.track(session, ack_id, delivery.size, delivery.ack)
- if( subscription_id != None ) {
- frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
- }
- if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
- val header = encode_header(config.add_redeliveries_header)
- val value = ascii(delivery.redeliveries.toString())
- frame = frame.append_headers((header, value)::Nil)
- }
- if( include_seq.isDefined ) {
- frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+ if( subscription_id != None ) {
+ frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+ }
+ if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
+ val header = encode_header(config.add_redeliveries_header)
+ val value = ascii(delivery.redeliveries.toString())
+ frame = frame.append_headers((header, value)::Nil)
+ }
+ if( include_seq.isDefined ) {
+ frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+ }
+ messages_sent += 1
+ Some(frame)
}
- messages_sent += 1
- frame
+
}, SessionDeliverySizer)
credit_window_filter.credit(initial_credit_window.count, initial_credit_window.size)
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Sep 26 17:42:57 2012
@@ -22,6 +22,7 @@ import org.fusesource.hawtbuf.AsciiBuffe
import org.apache.activemq.apollo.broker._
import java.net.{SocketTimeoutException, InetSocketAddress}
import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
+import org.fusesource.hawtdispatch._
/**
* These tests can be run in parallel against a single Apollo broker.
@@ -1542,4 +1543,45 @@ class StompParallelTest extends StompTes
disconnect()
}
+ for ( prefix<- List("queued", "block")) {
+ test("APLO-249: Message expiration does not (always) work on topics: "+prefix) {
+ val dest = next_id("/topic/"+prefix+".expiration")
+ val msg_count = 1000
+
+ connect("1.1")
+ subscribe("0", dest, "client")
+
+ Broker.BLOCKABLE_THREAD_POOL {
+ val sender = new StompClient
+ connect("1.1", sender)
+ val exp = System.currentTimeMillis()+500
+ for( i <- 1 to msg_count ) {
+ async_send(dest, "%01024d".format(i), "expires:"+exp+"\n", sender)
+ }
+ sync_send(dest, "DONE", c=sender)
+ close(sender)
+ }
+
+ var done = false
+ var received = 0
+ Thread.sleep(1000)
+ while( !done ) {
+ val (frame, ack) = receive_message()
+ val body = frame.substring(frame.indexOf("\n\n")+2)
+ if( body == "DONE" ) {
+ done = true
+ } else {
+ received +=1
+ ack(true)
+ }
+ }
+
+ val expired = (msg_count-received)
+ println("expired: "+expired)
+ expired should not be(0)
+
+ }
+
+ }
+
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1390624&r1=1390623&r2=1390624&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Wed Sep 26 17:42:57 2012
@@ -129,18 +129,23 @@ class StompTestSupport extends BrokerFun
}
def assert_received(body: Any, sub: String = null, c: StompClient = client, txid: String = null): (Boolean) => Unit = {
- val frame = c.receive()
- frame should startWith("MESSAGE\n")
- if (sub != null) {
- frame should include("subscription:" + sub + "\n")
- }
+ val (frame, ack) = receive_message(sub, c, txid)
body match {
case null =>
case body: scala.util.matching.Regex => frame should endWith regex (body)
case body => frame should endWith("\n\n" + body)
}
+ ack
+ }
+
+ def receive_message(sub: String = null, c: StompClient = client, txid: String = null): (String, (Boolean) => Unit) = {
+ val frame = c.receive()
+ frame should startWith("MESSAGE\n")
+ if (sub != null) {
+ frame should include("subscription:" + sub + "\n")
+ }
// return a func that can ack the message.
- (ack: Boolean) => {
+ (frame, (ack: Boolean) => {
if( c.version == "1.0" || c.version== "1.1" ) {
val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
@@ -163,7 +168,7 @@ class StompTestSupport extends BrokerFun
"\n")
}
- }
+ })
}
def wait_for_receipt(id: String, c: StompClient = client, discard_others: Boolean = false): Unit = {