You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2014/01/31 01:10:57 UTC
[1/3] git commit: Fixes APLO-324: ConcurrentModificationException in
recent snapshot
Updated Branches:
refs/heads/trunk a325f0fbe -> a6629f94b
Fixes APLO-324: ConcurrentModificationException in recent snapshot
Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/2fe48977
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/2fe48977
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/2fe48977
Branch: refs/heads/trunk
Commit: 2fe48977063c11709849d22b0fdbd34c1b7bf477
Parents: a325f0f
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 16:13:03 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 16:13:03 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/apollo/stomp/StompProtocolHandler.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/2fe48977/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index 7854823..8c338b0 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -950,8 +950,9 @@ class StompProtocolHandler extends ProtocolHandler {
})
trace("stomp protocol resources released")
+ val route_values = producer_routes.values().toArray;
waiting_on = ()=> {
- val routes = producer_routes.values().flatMap { route =>
+ val routes = route_values.flatMap { route =>
if( route.routing_items > 0 ) {
Some(route.dest+"("+route.routing_items+")")
} else {
[3/3] git commit: Implements APLO-301: Add a "ttl" header to control
message expiration
Posted by ch...@apache.org.
Implements APLO-301: Add a "ttl" header to control message expiration
Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/a6629f94
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/a6629f94
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/a6629f94
Branch: refs/heads/trunk
Commit: a6629f94b61cfa0b2b09107efc0ffcc75d315c29
Parents: c54cff3
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 18:50:22 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 19:10:40 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/apollo/broker/Queue.scala | 4 ++-
.../activemq/apollo/stomp/StompFrame.scala | 1 +
.../apollo/stomp/StompProtocolHandler.scala | 10 +++++-
.../apollo/stomp/test/StompParallelTest.scala | 38 ++++++++++++++++++++
.../src/documentation/stomp-manual.md | 23 ++++++++++--
5 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
----------------------------------------------------------------------
diff --git a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
index 1312ba5..4b9d47e 100644
--- a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
+++ b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
@@ -1173,7 +1173,9 @@ class Queue(val router: LocalRouter, val store_id:Long, var binding:Binding) ext
expired(actual, entry.entry) {
entry.ack(actual)
}
- actual.release
+ if( actual!=null ){
+ actual.release
+ }
case Delivered =>
entry.increment_nack
entry.entry.redelivered
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
index 1737954..e74f328 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
@@ -394,6 +394,7 @@ object Stomp {
val CORRELATION_ID = ascii("correlation-id")
val REPLY_TO = ascii("reply-to")
val EXPIRES = ascii("expires")
+ val TTL = ascii("ttl")
val PRIORITY = ascii("priority")
val TYPE = ascii("type")
val PERSISTENT = ascii("persistent")
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
index 8c338b0..f928a0e 100644
--- a/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
+++ b/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
@@ -950,7 +950,7 @@ class StompProtocolHandler extends ProtocolHandler {
})
trace("stomp protocol resources released")
- val route_values = producer_routes.values().toArray;
+ val route_values = producer_routes.values().toSeq.toArray;
waiting_on = ()=> {
val routes = route_values.flatMap { route =>
if( route.routing_items > 0 ) {
@@ -1413,6 +1413,14 @@ class StompProtocolHandler extends ProtocolHandler {
}
}
+ // Do we need to add an expires header?
+ for( ttl <- get( headers, TTL) ) {
+ if( get( headers, EXPIRES)==None ) {
+ val expiration = Broker.now + java.lang.Long.parseLong(ttl.toString)
+ rc ::= (EXPIRES -> ascii(expiration.toString))
+ }
+ }
+
// Do we need to add the message id?
if( get( headers, MESSAGE_ID) == None ) {
message_id_counter += 1
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
----------------------------------------------------------------------
diff --git a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
index 497231a..567b013 100644
--- a/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
+++ b/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
@@ -1023,6 +1023,44 @@ class StompParallelTest extends StompTestSupport with BrokerParallelTestExecutio
get("3")
}
+ test("Messages Expire Using TTL") {
+ connect("1.1")
+
+ def put(msg: String, ttl: Option[Long] = None) = {
+ val ttl_header = ttl.map(t => "ttl:" + t + "\n").getOrElse("")
+ client.write(
+ "SEND\n" +
+ ttl_header +
+ "destination:/queue/exp2\n" +
+ "\n" +
+ "message:" + msg + "\n")
+ }
+
+ put("1")
+ put("2", Some(1000L))
+ put("3")
+
+ Thread.sleep(2000)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/exp2\n" +
+ "id:1\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
+
+
+ def get(dest: String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
+ }
+
+ get("1")
+ get("3")
+ }
+
test("Expired message sent to DLQ") {
connect("1.1")
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/a6629f94/apollo-website/src/documentation/stomp-manual.md
----------------------------------------------------------------------
diff --git a/apollo-website/src/documentation/stomp-manual.md b/apollo-website/src/documentation/stomp-manual.md
index 04b2f56..3b36acc 100644
--- a/apollo-website/src/documentation/stomp-manual.md
+++ b/apollo-website/src/documentation/stomp-manual.md
@@ -217,9 +217,12 @@ redelivered to another subscribed client.
### Message Expiration
${project_name} supports expiring old messages. Unconsumed expired messages
-are automatically removed from the queue. You just need to specify when
-the message expires by setting the `expires` message header. The expiration
-time must be specified as the number of milliseconds since the Unix epoch.
+are automatically removed from the queue. There's two way to specify
+when the message will be expired. Y
+
+The first way to configure the expiration is by setting the `expires` message
+header. The expiration time must be specified as the number of milliseconds
+since the Unix epoch.
Example:
@@ -229,6 +232,20 @@ Example:
this message will expire on Tue Jun 21 17:02:28 EDT 2011
^@
+
+The first way to configure the expiration is by setting the `ttl` message
+header. The ttl will be intereted to mean the number of milliseconds from
+when the server receives the message. The broker will add an `expires`
+header to the message on your behalf.
+
+Example:
+
+ SEND
+ destination:/queue/a
+ ttl:2000
+
+ This message will expire in 2 seconds.
+ ^@
### Subscription Flow Control
[2/3] git commit: To aid in trouble shooting errors liek APLO-317,
lets add -XX:+HeapDumpOnOutOfMemoryError to the default JVM options.
Posted by ch...@apache.org.
To aid in trouble shooting errors liek APLO-317, lets add -XX:+HeapDumpOnOutOfMemoryError to the default JVM options.
Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/c54cff31
Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/c54cff31
Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/c54cff31
Branch: refs/heads/trunk
Commit: c54cff3138827bf78bc2edb181aa73ef30c89c30
Parents: 2fe4897
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Thu Jan 30 16:18:28 2014 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Thu Jan 30 16:18:28 2014 -0500
----------------------------------------------------------------------
apollo-distro/src/main/release/bin/apollo | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/c54cff31/apollo-distro/src/main/release/bin/apollo
----------------------------------------------------------------------
diff --git a/apollo-distro/src/main/release/bin/apollo b/apollo-distro/src/main/release/bin/apollo
index e502fb5..ea7cc03 100755
--- a/apollo-distro/src/main/release/bin/apollo
+++ b/apollo-distro/src/main/release/bin/apollo
@@ -124,7 +124,7 @@ if $cygwin; then
fi
if [ -z "$JVM_FLAGS" ] ; then
- JVM_FLAGS="-server -Xmx1G -XX:-UseBiasedLocking"
+ JVM_FLAGS="-server -Xmx1G -XX:+HeapDumpOnOutOfMemoryError -XX:-UseBiasedLocking"
fi
if [ "$APOLLO_ASSERTIONS" != "false" ] ; then