You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:30 UTC
svn commit: r961143 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/
activemq-selector/src/main/java/org/apache/active...
Author: chirino
Date: Wed Jul 7 04:10:30 2010
New Revision: 961143
URL: http://svn.apache.org/viewvc?rev=961143&view=rev
Log:
initial pass at implementing selectors for stomp.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java
- copied, changed from r961142, activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:10:30 2010
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
+import _root_.org.apache.activemq.filter.{Filterable}
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
@@ -73,7 +73,7 @@ trait DeliverySession extends Sink[Deliv
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait Message {
+trait Message extends Filterable {
/**
* the globally unique id of the message
@@ -107,11 +107,6 @@ trait Message {
def destination: Destination
/**
- * used to apply a selector against the message.
- */
- def messageEvaluationContext:MessageEvaluationContext
-
- /**
* The protocol encoding of the message.
*/
def protocol:AsciiBuffer
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 04:10:30 2010
@@ -298,14 +298,24 @@ class DeliveryProducerRoute(val router:R
if( full ) {
false
} else {
- if( delivery.message.persistent && router.host.store!=null ) {
- delivery.storeBatch = router.host.store.createStoreBatch
- delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
- }
+
+ // Do we need to store the message if we have a matching consumer?
+ var storeOnMatch = delivery.message.persistent && router.host.store!=null
targets.foreach { target=>
- if( !target.offer(delivery) ) {
- overflowSessions ::= target
+
+ // only delivery to matching consumers
+ if( target.consumer.matches(delivery) ) {
+
+ if( storeOnMatch ) {
+ delivery.storeBatch = router.host.store.createStoreBatch
+ delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+ storeOnMatch = false
+ }
+
+ if( !target.offer(delivery) ) {
+ overflowSessions ::= target
+ }
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul 7 04:10:30 2010
@@ -303,12 +303,10 @@ class DurableSubscription(val host:Virtu
// }
}
- def matches(message:Delivery) = {
+ def matches(delivery:Delivery) = {
if (selector != null) {
- var selectorContext = message.message.messageEvaluationContext
- selectorContext.setDestination(destination);
try {
- (selector.matches(selectorContext));
+ (selector.matches( delivery.message ));
} catch {
case e:FilterException=>
e.printStackTrace();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java Wed Jul 7 04:10:30 2010
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import org.apache.activemq.apollo.broker.Destination;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.Filterable;
import org.fusesource.hawtbuf.AsciiBuffer;
@@ -31,18 +31,17 @@ import org.fusesource.hawtbuf.AsciiBuffe
*
* @version $Revision: 1.3 $
*/
-public abstract class PathFilter implements BooleanExpression {
+public abstract class PathFilter {
public static final AsciiBuffer ANY_DESCENDENT = new AsciiBuffer(">");
public static final AsciiBuffer ANY_CHILD = new AsciiBuffer("*");
- public boolean matches(MessageEvaluationContext message) throws FilterException {
- Destination destination = message.getDestination();
+ public boolean matches(Destination destination) throws FilterException {
return matches(destination.getName());
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
- return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+ public Object evaluate(Destination destination) throws FilterException {
+ return matches(destination) ? Boolean.TRUE : Boolean.FALSE;
}
public abstract boolean matches(AsciiBuffer path);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java Wed Jul 7 04:10:30 2010
@@ -181,7 +181,7 @@ public abstract class ArithmeticExpressi
}
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object lvalue = left.evaluate(message);
if (lvalue == null) {
return null;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java Wed Jul 7 04:10:30 2010
@@ -31,6 +31,6 @@ public interface BooleanExpression exten
* @return true if the expression evaluates to Boolean.TRUE.
* @throws FilterException
*/
- boolean matches(MessageEvaluationContext message) throws FilterException;
+ boolean matches(Filterable message) throws FilterException;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java Wed Jul 7 04:10:30 2010
@@ -118,9 +118,9 @@ public abstract class ComparisonExpressi
}
/**
- * @see org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ * @see org.apache.activemq.filter.Expression#evaluate(Filterable)
*/
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object rv = this.getRight().evaluate(message);
@@ -137,7 +137,7 @@ public abstract class ComparisonExpressi
return likePattern.matcher((String)rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
@@ -199,7 +199,7 @@ public abstract class ComparisonExpressi
private static BooleanExpression doCreateEqual(Expression left, Expression right) {
return new ComparisonExpression(left, right) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object lv = left.evaluate(message);
Object rv = right.evaluate(message);
@@ -332,7 +332,7 @@ public abstract class ComparisonExpressi
}
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Comparable<Comparable> lv = (Comparable)left.evaluate(message);
if (lv == null) {
return null;
@@ -425,7 +425,7 @@ public abstract class ComparisonExpressi
protected abstract boolean asBoolean(int answer);
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java Wed Jul 7 04:10:30 2010
@@ -31,7 +31,7 @@ public class ConstantExpression implemen
super(value);
}
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
@@ -92,7 +92,7 @@ public class ConstantExpression implemen
return new ConstantExpression(value);
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
return value;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java Wed Jul 7 04:10:30 2010
@@ -29,6 +29,6 @@ public interface Expression {
/**
* @return the value of this expression
*/
- Object evaluate(MessageEvaluationContext message) throws FilterException;
+ Object evaluate(Filterable message) throws FilterException;
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java (from r961142, activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java&r1=961142&r2=961143&rev=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java Wed Jul 7 04:10:30 2010
@@ -16,21 +16,38 @@
*/
package org.apache.activemq.filter;
-
-
/**
- * A BooleanExpression is an expression that always
- * produces a Boolean result.
- *
- * @version $Revision: 1.2 $
+ * A Filterable is the object being evaluated by the filters. It provides
+ * access to filtered properties.
+ *
+ * @version $Revision: 1.4 $
*/
-public interface BooleanExpression extends Expression {
-
+public interface Filterable {
+
/**
- * @param message
- * @return true if the expression evaluates to Boolean.TRUE.
+ * This method is used by message filters which do content based routing (Like the XPath
+ * based selectors).
+ *
+ * @param <T>
+ * @param type
+ * @return
* @throws FilterException
*/
- boolean matches(MessageEvaluationContext message) throws FilterException;
+ <T> T getBodyAs(Class<T> type) throws FilterException;
+
+ /**
+ * Extracts the named message property
+ *
+ * @param name
+ * @return
+ */
+ Object getProperty(String name);
+
+ /**
+ * Used by the NoLocal filter.
+ *
+ * @return a unique id for the connection that produced the message.
+ */
+ Object getLocalConnectionId();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java Wed Jul 7 04:10:30 2010
@@ -35,7 +35,7 @@ public abstract class LogicExpression ex
public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Boolean lv = (Boolean)left.evaluate(message);
// Can we do an OR shortcut??
@@ -56,7 +56,7 @@ public abstract class LogicExpression ex
public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
return new LogicExpression(lvalue, rvalue) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Boolean lv = (Boolean)left.evaluate(message);
@@ -78,9 +78,9 @@ public abstract class LogicExpression ex
};
}
- public abstract Object evaluate(MessageEvaluationContext message) throws FilterException;
+ public abstract Object evaluate(Filterable message) throws FilterException;
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java Wed Jul 7 04:10:30 2010
@@ -26,17 +26,13 @@ package org.apache.activemq.filter;
public class PropertyExpression implements Expression {
private final String name;
- private Expression expression;
public PropertyExpression(String name) {
this.name = name;
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
- if (expression == null) {
- expression = message.getPropertyExpression(name);
- }
- return expression.evaluate(message);
+ public Object evaluate(Filterable message) throws FilterException {
+ return message.getProperty(name);
}
public String getName() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java Wed Jul 7 04:10:30 2010
@@ -39,7 +39,7 @@ public abstract class UnaryExpression im
public static Expression createNegate(Expression left) {
return new UnaryExpression(left) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
return null;
@@ -70,7 +70,7 @@ public abstract class UnaryExpression im
final Collection<Object> inList = t;
return new BooleanUnaryExpression(right) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
@@ -124,7 +124,7 @@ public abstract class UnaryExpression im
super(left);
}
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
@@ -132,7 +132,7 @@ public abstract class UnaryExpression im
public static BooleanExpression createNOT(BooleanExpression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Boolean lvalue = (Boolean)right.evaluate(message);
if (lvalue == null) {
return null;
@@ -156,7 +156,7 @@ public abstract class UnaryExpression im
public static BooleanExpression createBooleanCast(Expression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
return null;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java Wed Jul 7 04:10:30 2010
@@ -58,7 +58,7 @@ public final class XPathExpression imple
private final XPathEvaluator evaluator;
public static interface XPathEvaluator {
- boolean evaluate(MessageEvaluationContext message) throws FilterException;
+ boolean evaluate(Filterable message) throws FilterException;
}
XPathExpression(String xpath) {
@@ -88,7 +88,7 @@ public final class XPathExpression imple
}
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
}
@@ -101,7 +101,7 @@ public final class XPathExpression imple
* @return true if the expression evaluates to Boolean.TRUE.
* @throws FilterException
*/
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java Wed Jul 7 04:10:30 2010
@@ -28,7 +28,7 @@ public final class XQueryExpression impl
this.xpath = xpath;
}
- public Object evaluate(MessageEvaluationContext message) throws FilterException {
+ public Object evaluate(Filterable message) throws FilterException {
return Boolean.FALSE;
}
@@ -41,7 +41,7 @@ public final class XQueryExpression impl
* @return true if the expression evaluates to Boolean.TRUE.
* @throws FilterException
*/
- public boolean matches(MessageEvaluationContext message) throws FilterException {
+ public boolean matches(Filterable message) throws FilterException {
Object object = evaluate(message);
return object != null && object == Boolean.TRUE;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java Wed Jul 7 04:10:30 2010
@@ -39,7 +39,7 @@ public class XalanXPathEvaluator impleme
this.xpath = xpath;
}
- public boolean evaluate(MessageEvaluationContext m) throws FilterException {
+ public boolean evaluate(Filterable m) throws FilterException {
String stringBody = m.getBodyAs(String.class);
if (stringBody!=null) {
return evaluate(stringBody);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java Wed Jul 7 04:10:30 2010
@@ -23,14 +23,14 @@ import junit.framework.TestCase;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.Expression;
import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.Filterable;
/**
* @version $Revision: 1.7 $
*/
public class SelectorTest extends TestCase {
- class MockMessage implements MessageEvaluationContext {
+ class MockMessage implements Filterable {
HashMap<String, Object> properties = new HashMap<String, Object>();
private String text;
@@ -95,7 +95,17 @@ public class SelectorTest extends TestCa
return null;
}
- public <T> T getDestination() {
+ public Object getProperty(String name) {
+ if( "JMSType".equals(name) ) {
+ return type;
+ }
+ if( "JMSMessageID".equals(name) ) {
+ return messageId;
+ }
+ return properties.get(name);
+ }
+
+ public <T> T getDestination() {
return (T)destination;
}
@@ -103,20 +113,6 @@ public class SelectorTest extends TestCa
return localConnectionId;
}
- public Expression getPropertyExpression(final String name) {
- return new Expression() {
- public Object evaluate(MessageEvaluationContext mc) throws FilterException {
- MockMessage mockMessage = (MockMessage)mc;
- if( "JMSType".equals(name) ) {
- return mockMessage.type;
- }
- if( "JMSMessageID".equals(name) ) {
- return mockMessage.messageId;
- }
- return mockMessage.properties.get(name);
- }
- };
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 04:10:30 2010
@@ -27,8 +27,16 @@ object StompBroker {
var address = "0.0.0.0"
var port = 61613
+ var storeType = "hawtdb"
+
+ def main(args:Array[String]) = run
+
+ def run = {
+ println("=======================")
+ println("Press ENTER to shutdown");
+ println("=======================")
+ println("")
- def main(args:Array[String]) = {
val uri = "tcp://"+address+":"+port
println("Starting stomp broker: "+uri)
@@ -39,24 +47,49 @@ object StompBroker {
connector.protocol = "stomp"
connector.advertise = uri
-// val store = new CassandraStoreDTO
-// store.hosts.add("localhost:9160")
-
- val store = new HawtDBStoreDTO
- store.directory = new File("activemq-data")
-
+ val store = storeType match {
+ case "none" =>
+ null
+
+ case "hawtdb" =>
+ val rc = new HawtDBStoreDTO
+ rc.directory = new File("activemq-data")
+ rc
+
+ case "cassandra" =>
+ val rc = new CassandraStoreDTO
+ rc.hosts.add("localhost:9160")
+ rc
+ }
broker.config.virtualHosts.get(0).store = store
- val tracker = new LoggingTracker("broker startup")
+
+
+ var tracker = new LoggingTracker("broker startup")
tracker.start(broker)
tracker.await
println("Startup complete.")
System.in.read
+
println("Shutting down...")
- broker.stop
- println("Shutdown complete.")
+ tracker = new LoggingTracker("broker shutdown")
+ tracker.stop(broker)
+ tracker.await
+
+ println("=======================")
+ println("Shutdown");
+ println("=======================")
+
}
-
+ override def toString() = {
+ "--------------------------------------\n"+
+ "StompBroker Properties\n"+
+ "--------------------------------------\n"+
+ "address = "+address+"\n"+
+ "port = "+port+"\n"+
+ "storeType = "+storeType+"\n" +
+ ""
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 04:10:30 2010
@@ -17,10 +17,11 @@
package org.apache.activemq.apollo.stomp
import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
+import _root_.org.apache.activemq.filter.{Expression, Filterable}
import _root_.org.fusesource.hawtbuf._
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.broker.{Sizer, Destination, BufferConversions, Message}
+import java.lang.{String, Class}
/**
*
@@ -36,6 +37,7 @@ object StompFrameConstants {
import StompFrameConstants._
import StompConstants._;
import BufferConversions._
+import Buffer._
case class StompFrameMessage(frame:StompFrame) extends Message {
@@ -72,33 +74,6 @@ case class StompFrameMessage(frame:Stomp
*/
var destination: Destination = null
- /**
- * used to apply a selector against the message.
- */
- lazy val messageEvaluationContext = new MessageEvaluationContext() {
-
- def getBodyAs[T](clazz:Class[T]) = {
- throw new UnsupportedOperationException
- }
-
- def getPropertyExpression(name:String):Expression = {
- throw new UnsupportedOperationException
- }
-
- @deprecated("this should go away.")
- def getLocalConnectionId() = {
- throw new UnsupportedOperationException
- }
-
- def getDestination[T]():T = {
- throw new UnsupportedOperationException
- }
-
- def setDestination(destination:Any):Unit = {
- throw new UnsupportedOperationException
- }
- }
-
for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
header match {
case (Stomp.Headers.Message.MESSAGE_ID, value) =>
@@ -114,8 +89,55 @@ case class StompFrameMessage(frame:Stomp
case _ =>
}
}
+
+ def getBodyAs[T](toType : Class[T]) = {
+ (if( toType == classOf[String] ) {
+ frame.content.utf8
+ } else if (toType == classOf[Buffer]) {
+ frame.content
+ } else if (toType == classOf[AsciiBuffer]) {
+ frame.content.ascii
+ } else if (toType == classOf[UTF8Buffer]) {
+ frame.content.utf8
+ } else {
+ null
+ }).asInstanceOf[T]
+ }
+
+ def getLocalConnectionId = {
+ val pos = id.indexOf(':')
+ assert(pos >0 )
+ id.slice(id.offset, pos).toString
+ }
+
+ /* avoid paying the price of creating the header index. lots of times we don't need it */
+ lazy val headerIndex: Map[AsciiBuffer, AsciiBuffer] = {
+ var rc = Map[AsciiBuffer, AsciiBuffer]()
+ for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
+ rc += (header._1 -> header._2)
+ }
+ rc
+ }
+
+ def getProperty(name: String):AnyRef = {
+ (name match {
+ // TODO: handle more of the JMS Types that ActiveMQ 5 supports.
+ case "JMSMessageID" =>
+ Some(id)
+ case "JMSType" =>
+ headerIndex.get(ascii("type"))
+ case _=>
+ headerIndex.get(ascii(name))
+ }) match {
+ case Some(rc) => rc.utf8.toString
+ case None => null
+ }
+ }
+
}
+
+
object StompFrame extends Sizer[StompFrame] {
def size(value:StompFrame) = value.size
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:10:30 2010
@@ -30,7 +30,8 @@ import BufferConversions._
import StompFrameConstants._
import java.io.IOException
import org.apache.activemq.broker.store.StoreBatch
-
+import org.apache.activemq.selector.SelectorParser
+import org.apache.activemq.filter.{BooleanExpression, FilterException}
object StompConstants {
@@ -84,7 +85,7 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
- class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
+ class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression)) extends BaseRetained with DeliveryConsumer {
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
dispatchQueue.retain
@@ -95,7 +96,11 @@ class StompProtocolHandler extends Proto
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq PROTOCOL ) {
- true
+ if( selector!=null ) {
+ selector._2.matches(delivery.message)
+ } else {
+ true
+ }
} else {
false
}
@@ -279,6 +284,9 @@ class StompProtocolHandler extends Proto
def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
var storeBatch:StoreBatch=null
+ // User might be asking for ack that we have prcoessed the message..
+ val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+
if( !route.targets.isEmpty ) {
// We may need to add some headers..
@@ -295,8 +303,6 @@ class StompProtocolHandler extends Proto
delivery.message = message
delivery.size = message.frame.size
- // User might be asking for ack that we have prcoessed the message..
- val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
if( receipt!=null ) {
delivery.ack = { storeTx =>
connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
@@ -314,6 +320,9 @@ class StompProtocolHandler extends Proto
} else {
// info("Dropping message. No consumers interested in message.")
+ if( receipt!=null ) {
+ connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+ }
}
@@ -341,9 +350,15 @@ class StompProtocolHandler extends Proto
val selector = get(headers, Headers.Subscribe.SELECTOR) match {
case None=> null
case Some(x)=> x
+ try {
+ (x, SelectorParser.parse(x.utf8.toString))
+ } catch {
+ case e:FilterException =>
+ die("Invalid selector expression: "+e.getMessage)
+ null
+ }
}
-
consumers.get(id) match {
case None=>
info("subscribing to: %s", destiantion)
@@ -368,7 +383,9 @@ class StompProtocolHandler extends Proto
case Some(ack) =>
ack(null)
case None =>
- die("The specified message id is not waiting for a client ack: "+messageId)
+ // This can easily happen if the consumer is doing client acks on something like
+ // a non-durable topic.
+ // trace("The specified message id is not waiting for a client ack: %s", messageId)
}
case None=> die("message id header not set")
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 04:10:30 2010
@@ -50,7 +50,9 @@ object StompLoadClient {
var useContentLength=true
var persistent = false;
var syncSend = false;
+ var headers = List[String]()
var ack = "client";
+ var selector:String = null
var destinationType = "queue";
var destinationCount = 1;
@@ -105,7 +107,6 @@ object StompLoadClient {
System.in.read()
- println("=======================")
done.set(true)
// wait for the threads to finish..
@@ -122,6 +123,7 @@ object StompLoadClient {
sampleThread.interrupt
sampleThread.join
+ println("=======================")
println("Shutdown");
println("=======================")
@@ -134,7 +136,7 @@ object StompLoadClient {
"uri = "+uri+"\n"+
"destinationType = "+destinationType+"\n"+
"destinationCount = "+destinationCount+"\n" +
- "sampleInterval = "+sampleInterval+"\n"
+ "sampleInterval = "+sampleInterval+"\n" +
"\n"+
"--- Producer Properties ---\n"+
"producers = "+producers+"\n"+
@@ -143,11 +145,13 @@ object StompLoadClient {
"syncSend = "+syncSend+"\n"+
"useContentLength = "+useContentLength+"\n"+
"producerSleep = "+producerSleep+"\n"+
+ "headers = "+headers+"\n"+
"\n"+
"--- Consumer Properties ---\n"+
"consumers = "+consumers+"\n"+
"consumerSleep = "+consumerSleep+"\n"+
"ack = "+ack+"\n"+
+ "selector = "+selector+"\n"+
""
}
@@ -283,6 +287,7 @@ object StompLoadClient {
"destination:"+destination(id)+"\n"+
{ if(persistent) "persistent:true\n" else "" } +
{ if(syncSend) "receipt:xxx\n" else "" } +
+ { headers.foldLeft("") { case (sum, v)=> sum+v+"\n" } } +
{ if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
"\n"+message(name)).getBytes("UTF-8")
@@ -334,7 +339,10 @@ object StompLoadClient {
this.client=client
val headers = Map[AsciiBuffer, AsciiBuffer]();
client.send("""
-SUBSCRIBE
+SUBSCRIBE""" + (if(selector==null) {""} else {
+"""
+selector: """+selector
+}) + """
ack:"""+ack+"""
destination:"""+destination(id)+"""
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml Wed Jul 7 04:10:30 2010
@@ -83,6 +83,13 @@
<artifactId>commons-cli</artifactId>
<version>1.0</version>
<optional>true</optional>
+
+ <exclusions>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Scala Support -->