You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/01/31 01:10:57 UTC

[1/3] git commit: Fixes APLO-324: ConcurrentModificationException in recent snapshot

Updated Branches:
  refs/heads/trunk a325f0fbe -> a6629f94b


Fixes APLO-324: ConcurrentModificationException in recent snapshot

Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/2fe48977
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/2fe48977
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/2fe48977

Branch: refs/heads/trunk
Commit: 2fe48977063c11709849d22b0fdbd34c1b7bf477
Parents: a325f0f
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 16:13:03 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 16:13:03 2014 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/apollo/stomp/StompProtocolHandler.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/2fe48977/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index 7854823..8c338b0 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -950,8 +950,9 @@ class StompProtocolHandler extends ProtocolHandler {
       })
       trace("stomp protocol resources released")
 
+      val route_values = producer_routes.values().toArray;
       waiting_on = ()=> {
-        val routes = producer_routes.values().flatMap { route =>
+        val routes = route_values.flatMap { route =>
           if( route.routing_items > 0 ) {
             Some(route.dest+"("+route.routing_items+")")
           } else {


[3/3] git commit: Implements APLO-301: Add a "ttl" header to control message expiration

Posted by ch...@apache.org.
Implements APLO-301: Add a "ttl" header to control message expiration

Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/a6629f94
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/a6629f94
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/a6629f94

Branch: refs/heads/trunk
Commit: a6629f94b61cfa0b2b09107efc0ffcc75d315c29
Parents: c54cff3
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 18:50:22 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 19:10:40 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/apollo/broker/Queue.scala   |  4 ++-
 .../activemq/apollo/stomp/StompFrame.scala      |  1 +
 .../apollo/stomp/StompProtocolHandler.scala     | 10 +++++-
 .../apollo/stomp/test/StompParallelTest.scala   | 38 ++++++++++++++++++++
 .../src/documentation/stomp-manual.md           | 23 ++++++++++--
 5 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
----------------------------------------------------------------------
diff --git a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
index 1312ba5..4b9d47e 100644
--- a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
+++ b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
@@ -1173,7 +1173,9 @@ class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) ext
           expired(actual, entry.entry) {
             entry.ack(actual)
           }
-          actual.release
+          if( actual!=null ){
+            actual.release
+          }
         case Delivered =>
           entry.increment_nack
           entry.entry.redelivered

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
index 1737954..e74f328 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
@@ -394,6 +394,7 @@ object Stomp {
   val CORRELATION_ID = ascii("correlation-id")
   val REPLY_TO = ascii("reply-to")
   val EXPIRES = ascii("expires")
+  val TTL = ascii("ttl")
   val PRIORITY = ascii("priority")
   val TYPE = ascii("type")
   val PERSISTENT = ascii("persistent")

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index 8c338b0..f928a0e 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -950,7 +950,7 @@ class StompProtocolHandler extends ProtocolHandler {
       })
       trace("stomp protocol resources released")
 
-      val route_values = producer_routes.values().toArray;
+      val route_values = producer_routes.values().toSeq.toArray;
       waiting_on = ()=> {
         val routes = route_values.flatMap { route =>
           if( route.routing_items > 0 ) {
@@ -1413,6 +1413,14 @@ class StompProtocolHandler extends ProtocolHandler {
       }
     }
 
+    // Do we need to add an expires header?
+    for( ttl <- get( headers, TTL) ) {
+      if( get( headers, EXPIRES)==None ) {
+        val expiration = Broker.now + java.lang.Long.parseLong(ttl.toString)
+        rc ::= (EXPIRES -> ascii(expiration.toString))
+      }
+    }
+
     // Do we need to add the message id?
     if( get( headers, MESSAGE_ID) == None ) {
       message_id_counter += 1

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
index 497231a..567b013 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
@@ -1023,6 +1023,44 @@ class StompParallelTest extends StompTestSupport with BrokerParallelTestExecutio
     get("3")
   }
 
+  test("Messages Expire Using TTL") {
+    connect("1.1")
+
+    def put(msg: String, ttl: Option[Long] = None) = {
+      val ttl_header = ttl.map(t => "ttl:" + t + "\n").getOrElse("")
+      client.write(
+        "SEND\n" +
+                ttl_header +
+                "destination:/queue/exp2\n" +
+                "\n" +
+                "message:" + msg + "\n")
+    }
+
+    put("1")
+    put("2", Some(1000L))
+    put("3")
+
+    Thread.sleep(2000)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/exp2\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+
+    def get(dest: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
+
+    get("1")
+    get("3")
+  }
+
   test("Expired message sent to DLQ") {
     connect("1.1")
 

http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-website/src/documentation/stomp-manual.md
----------------------------------------------------------------------
diff --git a/apollo-website/src/documentation/stomp-manual.md b/apollo-website/src/documentation/stomp-manual.md
index 04b2f56..3b36acc 100644
--- a/apollo-website/src/documentation/stomp-manual.md
+++ b/apollo-website/src/documentation/stomp-manual.md
@@ -217,9 +217,12 @@ redelivered to another subscribed client.
 ### Message Expiration
 
 ${project_name} supports expiring old messages.  Unconsumed expired messages 
-are automatically removed from the queue.  You just need to specify when
-the message expires by setting the `expires` message header.  The expiration
-time must be specified as the number of milliseconds since the Unix epoch.
+are automatically removed from the queue.  There's two way to specify
+when the message will be expired.  Y
+
+The first way to configure the expiration is by setting the `expires` message 
+header.  The expiration time must be specified as the number of milliseconds 
+since the Unix epoch.
 
 Example:
 
@@ -229,6 +232,20 @@ Example:
 
     this message will expire on Tue Jun 21 17:02:28 EDT 2011
     ^@
+    
+The first way to configure the expiration is by setting the `ttl` message 
+header.  The ttl will be intereted to mean the number of milliseconds from
+when the server receives the message.  The broker will add an `expires`
+header to the message on your behalf.
+
+Example:
+
+    SEND
+    destination:/queue/a
+    ttl:2000
+
+    This message will expire in 2 seconds.
+    ^@
 
 ### Subscription Flow Control
 


[2/3] git commit: To aid in trouble shooting errors liek APLO-317, lets add -XX:+HeapDumpOnOutOfMemoryError to the default JVM options.

Posted by ch...@apache.org.
To aid in trouble shooting errors liek APLO-317, lets add -XX:+HeapDumpOnOutOfMemoryError to the default JVM options.

Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/c54cff31
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/c54cff31
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/c54cff31

Branch: refs/heads/trunk
Commit: c54cff3138827bf78bc2edb181aa73ef30c89c30
Parents: 2fe4897
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 16:18:28 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 16:18:28 2014 -0500

----------------------------------------------------------------------
 apollo-distro/src/main/release/bin/apollo | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/c54cff31/apollo-distro/src/main/release/bin/apollo
----------------------------------------------------------------------
diff --git a/apollo-distro/src/main/release/bin/apollo b/apollo-distro/src/main/release/bin/apollo
index e502fb5..ea7cc03 100755
--- a/apollo-distro/src/main/release/bin/apollo
+++ b/apollo-distro/src/main/release/bin/apollo
@@ -124,7 +124,7 @@ if $cygwin; then
 fi
 
 if [ -z "$JVM_FLAGS" ] ; then
-  JVM_FLAGS="-server -Xmx1G -XX:-UseBiasedLocking"
+  JVM_FLAGS="-server -Xmx1G -XX:+HeapDumpOnOutOfMemoryError -XX:-UseBiasedLocking"
 fi
 
 if [ "$APOLLO_ASSERTIONS" != "false" ] ; then