You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/01/21 20:37:04 UTC
svn commit: r1061978 - in /activemq/activemq-apollo/trunk:
apollo-selector/src/main/java/org/apache/activemq/apollo/filter/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Fri Jan 21 19:37:03 2011
New Revision: 1061978
URL: http://svn.apache.org/viewvc?rev=1061978&view=rev
Log:
Better selector handling.. stomp can now deal with comparison operators correctly.
Modified:
activemq/activemq-apollo/trunk/apollo-selector/src/main/java/org/apache/activemq/apollo/filter/ComparisonExpression.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-selector/src/main/java/org/apache/activemq/apollo/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-selector/src/main/java/org/apache/activemq/apollo/filter/ComparisonExpression.java?rev=1061978&r1=1061977&r2=1061978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-selector/src/main/java/org/apache/activemq/apollo/filter/ComparisonExpression.java (original)
+++ activemq/activemq-apollo/trunk/apollo-selector/src/main/java/org/apache/activemq/apollo/filter/ComparisonExpression.java Fri Jan 21 19:37:03 2011
@@ -350,73 +350,115 @@ public abstract class ComparisonExpressi
// If the the objects are not of the same type,
// try to convert up to allow the comparison.
if (lc != rc) {
- if (lc == Byte.class) {
- if (rc == Short.class) {
- lv = Short.valueOf(((Number)lv).shortValue());
- } else if (rc == Integer.class) {
- lv = Integer.valueOf(((Number)lv).intValue());
- } else if (rc == Long.class) {
- lv = Long.valueOf(((Number)lv).longValue());
- } else if (rc == Float.class) {
- lv = new Float(((Number)lv).floatValue());
- } else if (rc == Double.class) {
- lv = new Double(((Number)lv).doubleValue());
- } else {
- return Boolean.FALSE;
- }
- } else if (lc == Short.class) {
- if (rc == Integer.class) {
- lv = Integer.valueOf(((Number)lv).intValue());
- } else if (rc == Long.class) {
- lv = Long.valueOf(((Number)lv).longValue());
- } else if (rc == Float.class) {
- lv = new Float(((Number)lv).floatValue());
- } else if (rc == Double.class) {
- lv = new Double(((Number)lv).doubleValue());
- } else {
- return Boolean.FALSE;
- }
- } else if (lc == Integer.class) {
- if (rc == Long.class) {
- lv = Long.valueOf(((Number)lv).longValue());
- } else if (rc == Float.class) {
- lv = new Float(((Number)lv).floatValue());
- } else if (rc == Double.class) {
- lv = new Double(((Number)lv).doubleValue());
- } else {
- return Boolean.FALSE;
- }
- } else if (lc == Long.class) {
- if (rc == Integer.class) {
- rv = Long.valueOf(((Number)rv).longValue());
- } else if (rc == Float.class) {
- lv = new Float(((Number)lv).floatValue());
- } else if (rc == Double.class) {
- lv = new Double(((Number)lv).doubleValue());
- } else {
- return Boolean.FALSE;
- }
- } else if (lc == Float.class) {
- if (rc == Integer.class) {
- rv = new Float(((Number)rv).floatValue());
- } else if (rc == Long.class) {
- rv = new Float(((Number)rv).floatValue());
- } else if (rc == Double.class) {
- lv = new Double(((Number)lv).doubleValue());
- } else {
- return Boolean.FALSE;
- }
- } else if (lc == Double.class) {
- if (rc == Integer.class) {
- rv = new Double(((Number)rv).doubleValue());
- } else if (rc == Long.class) {
- rv = new Double(((Number)rv).doubleValue());
- } else if (rc == Float.class) {
- rv = new Float(((Number)rv).doubleValue());
+ try {
+ if (lc == Boolean.class) {
+ if (rc == String.class) {
+ rv = Boolean.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Byte.class) {
+ if (rc == Short.class) {
+ lv = Short.valueOf(((Number)lv).shortValue());
+ } else if (rc == Integer.class) {
+ lv = Integer.valueOf(((Number)lv).intValue());
+ } else if (rc == Long.class) {
+ lv = Long.valueOf(((Number)lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number)lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number)lv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Byte.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Short.class) {
+ if (rc == Integer.class) {
+ lv = Integer.valueOf(((Number)lv).intValue());
+ } else if (rc == Long.class) {
+ lv = Long.valueOf(((Number)lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number)lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number)lv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Short.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Integer.class) {
+ if (rc == Long.class) {
+ lv = Long.valueOf(((Number)lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number)lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number)lv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Integer.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Long.class) {
+ if (rc == Integer.class) {
+ rv = Long.valueOf(((Number)rv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number)lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number)lv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Long.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Float.class) {
+ if (rc == Integer.class) {
+ rv = new Float(((Number)rv).floatValue());
+ } else if (rc == Long.class) {
+ rv = new Float(((Number)rv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number)lv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Float.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Double.class) {
+ if (rc == Integer.class) {
+ rv = new Double(((Number)rv).doubleValue());
+ } else if (rc == Long.class) {
+ rv = new Double(((Number)rv).doubleValue());
+ } else if (rc == Float.class) {
+ rv = new Double(((Number)rv).doubleValue());
+ } else if (rc == String.class) {
+ rv = Double.valueOf((String)rv);
+ } else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == String.class) {
+
+ if (rc == Boolean.class) {
+ lv = Boolean.valueOf((String)lv);
+ } else if (rc == Byte.class) {
+ lv = Byte.valueOf((String)lv);
+ } else if (rc == Short.class) {
+ lv = Short.valueOf((String)lv);
+ } else if (rc == Integer.class) {
+ lv = Integer.valueOf((String)lv);
+ } else if (rc == Long.class) {
+ lv = Long.valueOf((String)lv);
+ } else if (rc == Float.class) {
+ lv = Float.valueOf((String)lv);
+ } else if (rc == Double.class) {
+ lv = Double.valueOf((String)lv);
+ } else {
+ return Boolean.FALSE;
+ }
+
} else {
return Boolean.FALSE;
}
- } else {
+ } catch (NumberFormatException e) {
return Boolean.FALSE;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1061978&r1=1061977&r2=1061978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Jan 21 19:37:03 2011
@@ -892,10 +892,12 @@ class StompProtocolHandler extends Proto
die("The subscription '%s' not found.".format(id))
case Some(consumer)=>
+ // consumer gets disposed after all producer stop sending to it...
+ consumer.setDisposer(^{ send_receipt(headers) })
+
consumers -= id
if( consumer.binding==null ) {
host.router.unbind(consumer.destination, consumer)
- send_receipt(headers)
} else {
reset {
@@ -905,20 +907,12 @@ class StompProtocolHandler extends Proto
if( persistent && consumer.binding!=null ) {
reset {
- val rc = host.router.destroy_queue(consumer.binding, security_context)
- rc match {
- case Failure(reason) =>
- async_die(reason)
- case Success(_) =>
- send_receipt(headers)
+ host.router.destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
+ async_die(reason)
}
}
- } else {
- send_receipt(headers)
}
-
}
-
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1061978&r1=1061977&r2=1061978&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jan 21 19:37:03 2011
@@ -225,6 +225,55 @@ class Stomp11HeartBeatTest extends Stomp
class StompDestinationTest extends StompTestSupport {
+ test("Selector Syntax") {
+ connect("1.1")
+
+ var sub_id=0;
+ def test_selector(selector:String, headers: List[String], expected_matches: List[Int]) = {
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/selected\n" +
+ "selector:"+selector+"\n" +
+ "receipt:0\n"+
+ "id:"+sub_id+"\n" +
+ "\n")
+ wait_for_receipt("0")
+
+ var id=1;
+
+ headers.foreach { header=>
+ client.write(
+ "SEND\n" +
+ "destination:/topic/selected\n" +
+ header+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ id += 1;
+ }
+
+ expected_matches.foreach{id=>
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+
+ client.write(
+ "UNSUBSCRIBE\n" +
+ "id:"+sub_id+"\n" +
+ "receipt:0\n"+
+ "\n")
+
+ wait_for_receipt("0")
+
+ sub_id+=1
+ }
+
+ test_selector("color = 'red'", List("color:blue", "not:set", "color:red"), List(3))
+ test_selector("age >= 21", List("age:3", "not:set", "age:21", "age:30"), List(3,4))
+
+ }
+
test("Queues load balance across subscribers") {
connect("1.1")
@@ -581,6 +630,7 @@ class StompDestinationTest extends Stomp
get(3)
}
+
}
class StompSslDestinationTest extends StompDestinationTest {