You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:30 UTC

svn commit: r961143 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/ activemq-selector/src/main/java/org/apache/active...

Author: chirino
Date: Wed Jul  7 04:10:30 2010
New Revision: 961143

URL: http://svn.apache.org/viewvc?rev=961143&view=rev
Log:
initial pass at implementing selectors for stomp.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java
      - copied, changed from r961142, activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
    activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 04:10:30 2010
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
+import _root_.org.apache.activemq.filter.{Filterable}
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
@@ -73,7 +73,7 @@ trait DeliverySession extends Sink[Deliv
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait Message {
+trait Message extends Filterable {
 
   /**
    * the globally unique id of the message
@@ -107,11 +107,6 @@ trait Message {
   def destination: Destination
 
   /**
-   * used to apply a selector against the message.
-   */
-  def messageEvaluationContext:MessageEvaluationContext
-
-  /**
    * The protocol encoding of the message.
    */
   def protocol:AsciiBuffer

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:10:30 2010
@@ -298,14 +298,24 @@ class DeliveryProducerRoute(val router:R
     if( full ) {
       false
     } else {
-      if( delivery.message.persistent && router.host.store!=null ) {
-        delivery.storeBatch = router.host.store.createStoreBatch
-        delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
-      }
+
+      // Do we need to store the message if we have a matching consumer?
+      var storeOnMatch = delivery.message.persistent && router.host.store!=null
 
       targets.foreach { target=>
-        if( !target.offer(delivery) ) {
-          overflowSessions ::= target
+
+        // only delivery to matching consumers
+        if( target.consumer.matches(delivery) ) {
+          
+          if( storeOnMatch ) {
+            delivery.storeBatch = router.host.store.createStoreBatch
+            delivery.storeKey = delivery.storeBatch.store(delivery.createMessageRecord)
+            storeOnMatch = false
+          }
+
+          if( !target.offer(delivery) ) {
+            overflowSessions ::= target
+          }
         }
       }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul  7 04:10:30 2010
@@ -303,12 +303,10 @@ class DurableSubscription(val host:Virtu
 //        }
     }
 
-    def matches(message:Delivery) = {
+    def matches(delivery:Delivery) = {
         if (selector != null) {
-          var selectorContext = message.message.messageEvaluationContext
-          selectorContext.setDestination(destination);
           try {
-              (selector.matches(selectorContext));
+              (selector.matches( delivery.message ));
           } catch {
             case e:FilterException=>
               e.printStackTrace();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java Wed Jul  7 04:10:30 2010
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.Filterable;
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 
@@ -31,18 +31,17 @@ import org.fusesource.hawtbuf.AsciiBuffe
  * 
  * @version $Revision: 1.3 $
  */
-public abstract class PathFilter implements BooleanExpression {
+public abstract class PathFilter {
 
     public static final AsciiBuffer ANY_DESCENDENT = new AsciiBuffer(">");
     public static final AsciiBuffer ANY_CHILD = new AsciiBuffer("*");
     
-	public boolean matches(MessageEvaluationContext message) throws FilterException {
-		Destination destination = message.getDestination();
+	public boolean matches(Destination destination) throws FilterException {
 		return matches(destination.getName());
 	}
 	
-	public Object evaluate(MessageEvaluationContext message) throws FilterException {
-		return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+	public Object evaluate(Destination destination) throws FilterException {
+		return matches(destination) ? Boolean.TRUE : Boolean.FALSE;
 	}
 	
     public abstract boolean matches(AsciiBuffer path);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ArithmeticExpression.java Wed Jul  7 04:10:30 2010
@@ -181,7 +181,7 @@ public abstract class ArithmeticExpressi
         }
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
+    public Object evaluate(Filterable message) throws FilterException {
         Object lvalue = left.evaluate(message);
         if (lvalue == null) {
             return null;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java Wed Jul  7 04:10:30 2010
@@ -31,6 +31,6 @@ public interface BooleanExpression exten
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws FilterException
      */
-    boolean matches(MessageEvaluationContext message) throws FilterException;
+    boolean matches(Filterable message) throws FilterException;
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ComparisonExpression.java Wed Jul  7 04:10:30 2010
@@ -118,9 +118,9 @@ public abstract class ComparisonExpressi
         }
 
         /**
-         * @see org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+         * @see org.apache.activemq.filter.Expression#evaluate(Filterable)
          */
-        public Object evaluate(MessageEvaluationContext message) throws FilterException {
+        public Object evaluate(Filterable message) throws FilterException {
 
             Object rv = this.getRight().evaluate(message);
 
@@ -137,7 +137,7 @@ public abstract class ComparisonExpressi
             return likePattern.matcher((String)rv).matches() ? Boolean.TRUE : Boolean.FALSE;
         }
 
-        public boolean matches(MessageEvaluationContext message) throws FilterException {
+        public boolean matches(Filterable message) throws FilterException {
             Object object = evaluate(message);
             return object != null && object == Boolean.TRUE;
         }
@@ -199,7 +199,7 @@ public abstract class ComparisonExpressi
     private static BooleanExpression doCreateEqual(Expression left, Expression right) {
         return new ComparisonExpression(left, right) {
 
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
                 Object lv = left.evaluate(message);
                 Object rv = right.evaluate(message);
 
@@ -332,7 +332,7 @@ public abstract class ComparisonExpressi
         }
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
+    public Object evaluate(Filterable message) throws FilterException {
         Comparable<Comparable> lv = (Comparable)left.evaluate(message);
         if (lv == null) {
             return null;
@@ -425,7 +425,7 @@ public abstract class ComparisonExpressi
 
     protected abstract boolean asBoolean(int answer);
 
-    public boolean matches(MessageEvaluationContext message) throws FilterException {
+    public boolean matches(Filterable message) throws FilterException {
         Object object = evaluate(message);
         return object != null && object == Boolean.TRUE;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/ConstantExpression.java Wed Jul  7 04:10:30 2010
@@ -31,7 +31,7 @@ public class ConstantExpression implemen
             super(value);
         }
 
-        public boolean matches(MessageEvaluationContext message) throws FilterException {
+        public boolean matches(Filterable message) throws FilterException {
             Object object = evaluate(message);
             return object != null && object == Boolean.TRUE;
         }
@@ -92,7 +92,7 @@ public class ConstantExpression implemen
         return new ConstantExpression(value);
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
+    public Object evaluate(Filterable message) throws FilterException {
         return value;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Expression.java Wed Jul  7 04:10:30 2010
@@ -29,6 +29,6 @@ public interface Expression {
     /**
      * @return the value of this expression
      */
-    Object evaluate(MessageEvaluationContext message) throws FilterException;
+    Object evaluate(Filterable message) throws FilterException;
     
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java (from r961142, activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java&r1=961142&r2=961143&rev=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/BooleanExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/Filterable.java Wed Jul  7 04:10:30 2010
@@ -16,21 +16,38 @@
  */
 package org.apache.activemq.filter;
 
-
-
 /**
- * A BooleanExpression is an expression that always
- * produces a Boolean result.
- *
- * @version $Revision: 1.2 $
+ * A Filterable is the object being evaluated by the filters.  It provides
+ * access to filtered properties.
+ * 
+ * @version $Revision: 1.4 $
  */
-public interface BooleanExpression extends Expression {
-    
+public interface Filterable {
+
     /**
-     * @param message
-     * @return true if the expression evaluates to Boolean.TRUE.
+     * This method is used by message filters which do content based routing (Like the XPath
+     * based selectors).
+     *
+     * @param <T>
+     * @param type
+     * @return
      * @throws FilterException
      */
-    boolean matches(MessageEvaluationContext message) throws FilterException;
+    <T> T getBodyAs(Class<T> type) throws FilterException;
+
+    /**
+     * Extracts the named message property
+     *
+     * @param name
+     * @return
+     */
+    Object getProperty(String name);
+
+    /**
+     * Used by the NoLocal filter.
+     *
+     * @return a unique id for the connection that produced the message.
+     */
+    Object getLocalConnectionId();
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/LogicExpression.java Wed Jul  7 04:10:30 2010
@@ -35,7 +35,7 @@ public abstract class LogicExpression ex
     public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
         return new LogicExpression(lvalue, rvalue) {
 
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
 
                 Boolean lv = (Boolean)left.evaluate(message);
                 // Can we do an OR shortcut??
@@ -56,7 +56,7 @@ public abstract class LogicExpression ex
     public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
         return new LogicExpression(lvalue, rvalue) {
 
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
 
                 Boolean lv = (Boolean)left.evaluate(message);
 
@@ -78,9 +78,9 @@ public abstract class LogicExpression ex
         };
     }
 
-    public abstract Object evaluate(MessageEvaluationContext message) throws FilterException;
+    public abstract Object evaluate(Filterable message) throws FilterException;
 
-    public boolean matches(MessageEvaluationContext message) throws FilterException {
+    public boolean matches(Filterable message) throws FilterException {
         Object object = evaluate(message);
         return object != null && object == Boolean.TRUE;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/PropertyExpression.java Wed Jul  7 04:10:30 2010
@@ -26,17 +26,13 @@ package org.apache.activemq.filter;
 public class PropertyExpression implements Expression {
 
     private final String name;
-    private Expression expression;
 
     public PropertyExpression(String name) {
         this.name = name;
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
-        if (expression == null) {
-            expression = message.getPropertyExpression(name);
-        }
-        return expression.evaluate(message);
+    public Object evaluate(Filterable message) throws FilterException {
+        return message.getProperty(name);
     }
 
     public String getName() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/UnaryExpression.java Wed Jul  7 04:10:30 2010
@@ -39,7 +39,7 @@ public abstract class UnaryExpression im
 
     public static Expression createNegate(Expression left) {
         return new UnaryExpression(left) {
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
                     return null;
@@ -70,7 +70,7 @@ public abstract class UnaryExpression im
         final Collection<Object> inList = t;
 
         return new BooleanUnaryExpression(right) {
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
 
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
@@ -124,7 +124,7 @@ public abstract class UnaryExpression im
             super(left);
         }
 
-        public boolean matches(MessageEvaluationContext message) throws FilterException {
+        public boolean matches(Filterable message) throws FilterException {
             Object object = evaluate(message);
             return object != null && object == Boolean.TRUE;
         }
@@ -132,7 +132,7 @@ public abstract class UnaryExpression im
 
     public static BooleanExpression createNOT(BooleanExpression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
                 Boolean lvalue = (Boolean)right.evaluate(message);
                 if (lvalue == null) {
                     return null;
@@ -156,7 +156,7 @@ public abstract class UnaryExpression im
 
     public static BooleanExpression createBooleanCast(Expression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(MessageEvaluationContext message) throws FilterException {
+            public Object evaluate(Filterable message) throws FilterException {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
                     return null;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XPathExpression.java Wed Jul  7 04:10:30 2010
@@ -58,7 +58,7 @@ public final class XPathExpression imple
     private final XPathEvaluator evaluator;
 
     public static interface XPathEvaluator {
-        boolean evaluate(MessageEvaluationContext message) throws FilterException;
+        boolean evaluate(Filterable message) throws FilterException;
     }
 
     XPathExpression(String xpath) {
@@ -88,7 +88,7 @@ public final class XPathExpression imple
         }
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
+    public Object evaluate(Filterable message) throws FilterException {
         return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
     }
 
@@ -101,7 +101,7 @@ public final class XPathExpression imple
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws FilterException
      */
-    public boolean matches(MessageEvaluationContext message) throws FilterException {
+    public boolean matches(Filterable message) throws FilterException {
         Object object = evaluate(message);
         return object != null && object == Boolean.TRUE;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XQueryExpression.java Wed Jul  7 04:10:30 2010
@@ -28,7 +28,7 @@ public final class XQueryExpression impl
         this.xpath = xpath;
     }
 
-    public Object evaluate(MessageEvaluationContext message) throws FilterException {
+    public Object evaluate(Filterable message) throws FilterException {
         return Boolean.FALSE;
     }
 
@@ -41,7 +41,7 @@ public final class XQueryExpression impl
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws FilterException
      */
-    public boolean matches(MessageEvaluationContext message) throws FilterException {
+    public boolean matches(Filterable message) throws FilterException {
         Object object = evaluate(message);
         return object != null && object == Boolean.TRUE;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/main/java/org/apache/activemq/filter/XalanXPathEvaluator.java Wed Jul  7 04:10:30 2010
@@ -39,7 +39,7 @@ public class XalanXPathEvaluator impleme
         this.xpath = xpath;
     }
 
-    public boolean evaluate(MessageEvaluationContext m) throws FilterException {
+    public boolean evaluate(Filterable m) throws FilterException {
         String stringBody = m.getBodyAs(String.class);
         if (stringBody!=null) {
             return evaluate(stringBody);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-selector/src/test/java/org/apache/activemq/selector/SelectorTest.java Wed Jul  7 04:10:30 2010
@@ -23,14 +23,14 @@ import junit.framework.TestCase;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.Expression;
 import org.apache.activemq.filter.FilterException;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.Filterable;
 
 /**
  * @version $Revision: 1.7 $
  */
 public class SelectorTest extends TestCase {
 		
-    class MockMessage implements MessageEvaluationContext {
+    class MockMessage implements Filterable {
 
     	HashMap<String, Object> properties = new HashMap<String, Object>();
 		private String text;
@@ -95,7 +95,17 @@ public class SelectorTest extends TestCa
 			return null;
 		}
 
-		public <T> T getDestination() {
+        public Object getProperty(String name) {
+            if( "JMSType".equals(name) ) {
+                return type;
+            }
+            if( "JMSMessageID".equals(name) ) {
+                return messageId;
+            }
+            return properties.get(name);
+        }
+
+        public <T> T getDestination() {
 			return (T)destination;
 		}
 
@@ -103,20 +113,6 @@ public class SelectorTest extends TestCa
 			return localConnectionId;
 		}
 
-		public Expression getPropertyExpression(final String name) {
-			return new Expression() {
-		        public Object evaluate(MessageEvaluationContext mc) throws FilterException {
-	                MockMessage mockMessage = (MockMessage)mc;
-	    			if( "JMSType".equals(name) ) {
-	    				return mockMessage.type;
-	    			}
-	    			if( "JMSMessageID".equals(name) ) {
-	    				return mockMessage.messageId;
-	    			}
-					return mockMessage.properties.get(name);
-		        }
-			};
-		}
     	
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 04:10:30 2010
@@ -27,8 +27,16 @@ object StompBroker {
 
   var address = "0.0.0.0"
   var port = 61613
+  var storeType = "hawtdb"
+
+  def main(args:Array[String]) = run
+
+  def run = {
+    println("=======================")
+    println("Press ENTER to shutdown");
+    println("=======================")
+    println("")
 
-  def main(args:Array[String]) = {
     val uri = "tcp://"+address+":"+port
 
     println("Starting stomp broker: "+uri)
@@ -39,24 +47,49 @@ object StompBroker {
     connector.protocol = "stomp"
     connector.advertise = uri
 
-//    val store = new CassandraStoreDTO
-//    store.hosts.add("localhost:9160")
-
-    val store = new HawtDBStoreDTO
-    store.directory = new File("activemq-data")
-    
+    val store = storeType match {
+      case "none" =>
+        null
+
+      case "hawtdb" =>
+        val rc = new HawtDBStoreDTO
+        rc.directory = new File("activemq-data")
+        rc
+
+      case "cassandra" =>
+        val rc = new CassandraStoreDTO
+        rc.hosts.add("localhost:9160")
+        rc
+    }
     broker.config.virtualHosts.get(0).store = store
 
-    val tracker = new LoggingTracker("broker startup")
+
+
+    var tracker = new LoggingTracker("broker startup")
     tracker.start(broker)
     tracker.await
     println("Startup complete.")
 
     System.in.read
+
     println("Shutting down...")
-    broker.stop
-    println("Shutdown complete.")
+    tracker = new LoggingTracker("broker shutdown")
+    tracker.stop(broker)
+    tracker.await
+    
+    println("=======================")
+    println("Shutdown");
+    println("=======================")
+
   }
 
-  
+  override def toString() = {
+    "--------------------------------------\n"+
+    "StompBroker Properties\n"+
+    "--------------------------------------\n"+
+    "address          = "+address+"\n"+
+    "port             = "+port+"\n"+
+    "storeType        = "+storeType+"\n" +
+    ""
+  }  
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 04:10:30 2010
@@ -17,10 +17,11 @@
 package org.apache.activemq.apollo.stomp
 
 import _root_.java.util.LinkedList
-import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
+import _root_.org.apache.activemq.filter.{Expression, Filterable}
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.{Sizer, Destination, BufferConversions, Message}
+import java.lang.{String, Class}
 
 /**
  *
@@ -36,6 +37,7 @@ object StompFrameConstants {
 import StompFrameConstants._
 import StompConstants._;
 import BufferConversions._
+import Buffer._
 
 case class StompFrameMessage(frame:StompFrame) extends Message {
   
@@ -72,33 +74,6 @@ case class StompFrameMessage(frame:Stomp
    */
   var destination: Destination = null
 
-  /**
-   * used to apply a selector against the message.
-   */
-  lazy val messageEvaluationContext = new MessageEvaluationContext() {
-
-    def getBodyAs[T](clazz:Class[T]) = {
-      throw new UnsupportedOperationException
-    }
-
-    def getPropertyExpression(name:String):Expression = {
-      throw new UnsupportedOperationException
-    }
-
-    @deprecated("this should go away.")
-    def getLocalConnectionId() = {
-      throw new UnsupportedOperationException
-    }
-
-    def getDestination[T]():T = {
-      throw new UnsupportedOperationException
-    }
-
-    def setDestination(destination:Any):Unit = {
-      throw new UnsupportedOperationException
-    }
-  }
-
   for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
     header match {
       case (Stomp.Headers.Message.MESSAGE_ID, value) =>
@@ -114,8 +89,55 @@ case class StompFrameMessage(frame:Stomp
       case _ =>
     }
   }
+
+  def getBodyAs[T](toType : Class[T]) = {
+    (if( toType == classOf[String] ) {
+      frame.content.utf8
+    } else if (toType == classOf[Buffer]) {
+      frame.content
+    } else if (toType == classOf[AsciiBuffer]) {
+      frame.content.ascii
+    } else if (toType == classOf[UTF8Buffer]) {
+      frame.content.utf8
+    } else {
+      null
+    }).asInstanceOf[T]
+  }
+
+  def getLocalConnectionId = {
+    val pos = id.indexOf(':')
+    assert(pos >0 )
+    id.slice(id.offset, pos).toString
+  }
+
+  /* avoid paying the price of creating the header index. lots of times we don't need it */
+  lazy val headerIndex: Map[AsciiBuffer, AsciiBuffer] =  {
+    var rc = Map[AsciiBuffer, AsciiBuffer]()
+    for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
+      rc += (header._1 -> header._2)
+    }
+    rc
+  }
+
+  def getProperty(name: String):AnyRef = {
+    (name match {
+      // TODO: handle more of the JMS Types that ActiveMQ 5 supports.
+      case "JMSMessageID" =>
+        Some(id)
+      case "JMSType" =>
+        headerIndex.get(ascii("type"))
+      case _=>
+        headerIndex.get(ascii(name))
+    }) match {
+      case Some(rc) => rc.utf8.toString
+      case None => null
+    }
+  }
+
 }
 
+
+
 object StompFrame extends Sizer[StompFrame] {
   def size(value:StompFrame) = value.size   
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 04:10:30 2010
@@ -30,7 +30,8 @@ import BufferConversions._
 import StompFrameConstants._
 import java.io.IOException
 import org.apache.activemq.broker.store.StoreBatch
-
+import org.apache.activemq.selector.SelectorParser
+import org.apache.activemq.filter.{BooleanExpression, FilterException}
 
 object StompConstants {
 
@@ -84,7 +85,7 @@ class StompProtocolHandler extends Proto
   
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
+  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression)) extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
     dispatchQueue.retain
@@ -95,7 +96,11 @@ class StompProtocolHandler extends Proto
 
     def matches(delivery:Delivery) = {
       if( delivery.message.protocol eq PROTOCOL ) {
-        true
+        if( selector!=null ) {
+          selector._2.matches(delivery.message)
+        } else {
+          true
+        }
       } else {
         false
       }
@@ -279,6 +284,9 @@ class StompProtocolHandler extends Proto
 
   def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
     var storeBatch:StoreBatch=null
+    // User might be asking for ack that we have prcoessed the message..
+    val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
@@ -295,8 +303,6 @@ class StompProtocolHandler extends Proto
       delivery.message = message
       delivery.size = message.frame.size
 
-      // User might be asking for ack that we have prcoessed the message..
-      val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
       if( receipt!=null ) {
         delivery.ack = { storeTx =>
           connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
@@ -314,6 +320,9 @@ class StompProtocolHandler extends Proto
 
     } else {
       // info("Dropping message.  No consumers interested in message.")
+      if( receipt!=null ) {
+        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt))))
+      }
     }
 
 
@@ -341,9 +350,15 @@ class StompProtocolHandler extends Proto
         val selector = get(headers, Headers.Subscribe.SELECTOR) match {
           case None=> null
           case Some(x)=> x
+            try {
+              (x, SelectorParser.parse(x.utf8.toString))
+            } catch {
+              case e:FilterException =>
+                die("Invalid selector expression: "+e.getMessage)
+              null
+            }
         }
 
-
         consumers.get(id) match {
           case None=>
             info("subscribing to: %s", destiantion)
@@ -368,7 +383,9 @@ class StompProtocolHandler extends Proto
           case Some(ack) =>
             ack(null)
           case None =>
-            die("The specified message id is not waiting for a client ack: "+messageId)
+            // This can easily happen if the consumer is doing client acks on something like
+            // a non-durable topic.
+            // trace("The specified message id is not waiting for a client ack: %s", messageId)
         }
       case None=> die("message id header not set")
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 04:10:30 2010
@@ -50,7 +50,9 @@ object StompLoadClient {
   var useContentLength=true
   var persistent = false;
   var syncSend = false;
+  var headers = List[String]()
   var ack = "client";
+  var selector:String = null
 
   var destinationType = "queue";
   var destinationCount = 1;
@@ -105,7 +107,6 @@ object StompLoadClient {
 
 
     System.in.read()
-    println("=======================")
     done.set(true)
 
     // wait for the threads to finish..
@@ -122,6 +123,7 @@ object StompLoadClient {
     sampleThread.interrupt
     sampleThread.join
 
+    println("=======================")
     println("Shutdown");
     println("=======================")
 
@@ -134,7 +136,7 @@ object StompLoadClient {
     "uri              = "+uri+"\n"+
     "destinationType  = "+destinationType+"\n"+
     "destinationCount = "+destinationCount+"\n" +
-    "sampleInterval   = "+sampleInterval+"\n"
+    "sampleInterval   = "+sampleInterval+"\n" +
     "\n"+
     "--- Producer Properties ---\n"+
     "producers        = "+producers+"\n"+
@@ -143,11 +145,13 @@ object StompLoadClient {
     "syncSend         = "+syncSend+"\n"+
     "useContentLength = "+useContentLength+"\n"+
     "producerSleep    = "+producerSleep+"\n"+
+    "headers          = "+headers+"\n"+
     "\n"+
     "--- Consumer Properties ---\n"+
     "consumers        = "+consumers+"\n"+
     "consumerSleep    = "+consumerSleep+"\n"+
     "ack              = "+ack+"\n"+
+    "selector         = "+selector+"\n"+
     ""
 
   }
@@ -283,6 +287,7 @@ object StompLoadClient {
               "destination:"+destination(id)+"\n"+
                { if(persistent) "persistent:true\n" else "" } +
                { if(syncSend) "receipt:xxx\n" else "" } +
+               { headers.foldLeft("") { case (sum, v)=> sum+v+"\n" } } +
                { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
               "\n"+message(name)).getBytes("UTF-8")
 
@@ -334,7 +339,10 @@ object StompLoadClient {
           this.client=client
           val headers = Map[AsciiBuffer, AsciiBuffer]();
           client.send("""
-SUBSCRIBE
+SUBSCRIBE""" + (if(selector==null) {""} else {
+"""
+selector: """+selector
+}) + """
 ack:"""+ack+"""
 destination:"""+destination(id)+"""
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml?rev=961143&r1=961142&r2=961143&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/pom.xml Wed Jul  7 04:10:30 2010
@@ -83,6 +83,13 @@
       <artifactId>commons-cli</artifactId>
       <version>1.0</version>
       <optional>true</optional>
+
+      <exclusions>
+        <exclusion>
+         <groupId>commons-lang</groupId>
+         <artifactId>commons-lang</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
                 
     <!-- Scala Support -->