You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC

svn commit: r655323 [2/4] - in /incubator/qpid/branches/broker-queue-refactor/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Sun May 11 08:22:03 2008
@@ -22,71 +22,22 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * A filter performing a comparison of two objects
  */
-public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
+public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
 {
 
-    public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
+    public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
     {
-        return new LogicExpression(lvalue, rvalue)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-
-                    Boolean lv = (Boolean) left.evaluate(message);
-                    // Can we do an OR shortcut??
-                    if ((lv != null) && lv.booleanValue())
-                    {
-                        return Boolean.TRUE;
-                    }
-
-                    Boolean rv = (Boolean) right.evaluate(message);
-
-                    return (rv == null) ? null : rv;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "OR";
-                }
-            };
+        return new OrExpression(lvalue, rvalue);
     }
 
-    public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
+    public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
     {
-        return new LogicExpression(lvalue, rvalue)
-            {
-
-                public Object evaluate(AMQMessage message) throws AMQException
-                {
-
-                    Boolean lv = (Boolean) left.evaluate(message);
-
-                    // Can we do an AND shortcut??
-                    if (lv == null)
-                    {
-                        return null;
-                    }
-
-                    if (!lv.booleanValue())
-                    {
-                        return Boolean.FALSE;
-                    }
-
-                    Boolean rv = (Boolean) right.evaluate(message);
-
-                    return (rv == null) ? null : rv;
-                }
-
-                public String getExpressionSymbol()
-                {
-                    return "AND";
-                }
-            };
+        return new AndExpression(lvalue, rvalue);
     }
 
     /**
@@ -98,13 +49,74 @@
         super(left, right);
     }
 
-    public abstract Object evaluate(AMQMessage message) throws AMQException;
+    public abstract Object evaluate(Filterable<E> message) throws E;
 
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable<E> message) throws E
     {
         Object object = evaluate(message);
 
         return (object != null) && (object == Boolean.TRUE);
     }
 
+    private static class OrExpression<E extends Exception> extends LogicExpression<E>
+    {
+        public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        {
+            super(lvalue, rvalue);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Boolean lv = (Boolean) left.evaluate(message);
+            // Can we do an OR shortcut??
+            if ((lv != null) && lv.booleanValue())
+            {
+                return Boolean.TRUE;
+            }
+
+            Boolean rv = (Boolean) right.evaluate(message);
+
+            return (rv == null) ? null : rv;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "OR";
+        }
+    }
+
+    private static class AndExpression<E extends Exception> extends LogicExpression<E>
+    {
+        public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+        {
+            super(lvalue, rvalue);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Boolean lv = (Boolean) left.evaluate(message);
+
+            // Can we do an AND shortcut??
+            if (lv == null)
+            {
+                return null;
+            }
+
+            if (!lv.booleanValue())
+            {
+                return Boolean.FALSE;
+            }
+
+            Boolean rv = (Boolean) right.evaluate(message);
+
+            return (rv == null) ? null : rv;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "AND";
+        }
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Sun May 11 08:22:03 2008
@@ -22,8 +22,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
-public interface MessageFilter
+public interface MessageFilter<E extends Exception>
 {
-    boolean matches(AMQMessage message) throws AMQException;
+    boolean matches(Filterable<E> message) throws E;
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Sun May 11 08:22:03 2008
@@ -22,7 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 public class NoConsumerFilter implements MessageFilter
 {
@@ -34,7 +34,7 @@
         _logger.info("Created NoConsumerFilter");
     }
 
-    public boolean matches(AMQMessage message)
+    public boolean matches(Filterable message)
     {
        return true;
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Sun May 11 08:22:03 2008
@@ -30,12 +30,12 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * Represents a property  expression
  */
-public class PropertyExpression implements Expression
+public class PropertyExpression<E extends Exception> implements Expression<E>
 {
     // Constants - defined the same as JMS
     private static final int NON_PERSISTENT = 1;
@@ -44,223 +44,60 @@
 
     private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
 
-    private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+    private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
 
-    static
     {
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
                                      {
-                                         public Object evaluate(AMQMessage message)
+                                         public Object evaluate(Filterable<E> message)
                                          {
                                              //TODO
                                              return null;
                                          }
                                      });
-        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString replyTo = _properties.getReplyTo();
-
-                                                 return (replyTo == null) ? null : replyTo.toString();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString type = _properties.getType();
-
-                                                 return (type == null) ? null : type.toString();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
-                                                 if (_logger.isDebugEnabled())
-                                                 {
-                                                     _logger.debug("JMSDeliveryMode is :" + mode);
-                                                 }
-
-                                                 return mode;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new ReplyToExpression());
 
-                                             return NON_PERSISTENT;
-                                         }
-                                     });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new TypeExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return (int) _properties.getPriority();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new DeliveryModeExpression());
 
-                                             return DEFAULT_PRIORITY;
-                                         }
-                                     });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new PriorityExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
+        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new MessageIDExpression());
 
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString messageId = _properties.getMessageId();
-
-                                                 return (messageId == null) ? null : messageId;
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
+        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new TimestampExpression());
 
-                                                 return null;
-                                             }
+        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new CorrelationIdExpression());
 
-                                         }
-                                     });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
                                      {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return _properties.getTimestamp();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-                                                 AMQShortString correlationId = _properties.getCorrelationId();
-
-                                                 return (correlationId == null) ? null : correlationId.toString();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
-                                         {
-
-                                             try
-                                             {
-                                                 CommonContentHeaderProperties _properties =
-                                                     (CommonContentHeaderProperties)
-                                                         message.getContentHeaderBody().properties;
-
-                                                 return _properties.getExpiration();
-                                             }
-                                             catch (AMQException e)
-                                             {
-                                                 _logger.warn(e);
-
-                                                 return null;
-                                             }
-
-                                         }
-                                     });
-
-        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
-                                     {
-                                         public Object evaluate(AMQMessage message)
+                                         public Object evaluate(Filterable message) throws E
                                          {
                                              return message.isRedelivered();
                                          }
                                      });
-
     }
 
     private final String name;
-    private final Expression jmsPropertyExpression;
+    private final Expression<E> jmsPropertyExpression;
+
+    public boolean outerTest()
+    {
+        return false;
+    }
 
     public PropertyExpression(String name)
     {
         this.name = name;
-        jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
+
+        
+
+        jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException
+    public Object evaluate(Filterable<E> message) throws E
     {
 
         if (jmsPropertyExpression != null)
@@ -319,4 +156,113 @@
 
     }
 
+    private static class ReplyToExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString replyTo = _properties.getReplyTo();
+
+            return (replyTo == null) ? null : replyTo.toString();
+
+        }
+
+    }
+
+    private static class TypeExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+                CommonContentHeaderProperties _properties =
+                    (CommonContentHeaderProperties)
+                        message.getContentHeaderBody().properties;
+                AMQShortString type = _properties.getType();
+
+                return (type == null) ? null : type.toString();
+
+        }
+    }
+
+    private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+                int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("JMSDeliveryMode is :" + mode);
+                }
+
+                return mode;
+        }
+    }
+
+    private static class PriorityExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return (int) _properties.getPriority();
+        }
+    }
+
+    private static class MessageIDExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString messageId = _properties.getMessageId();
+
+            return (messageId == null) ? null : messageId;
+
+        }
+    }
+
+    private static class TimestampExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return _properties.getTimestamp();
+        }
+    }
+
+    private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+            AMQShortString correlationId = _properties.getCorrelationId();
+
+            return (correlationId == null) ? null : correlationId.toString();
+        }
+    }
+
+    private static class ExpirationExpression<E extends Exception> implements Expression<E>
+    {
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            CommonContentHeaderProperties _properties =
+                (CommonContentHeaderProperties)
+                    message.getContentHeaderBody().properties;
+
+            return _properties.getExpiration();
+
+        }
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Sun May 11 08:22:03 2008
@@ -25,32 +25,33 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
-public class SimpleFilterManager implements FilterManager
+public class SimpleFilterManager implements FilterManager<AMQException>
 {
     private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
 
-    private final ConcurrentLinkedQueue<MessageFilter> _filters;
+    private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
 
     public SimpleFilterManager()
     {
         _logger.debug("Creating SimpleFilterManager");
-        _filters = new ConcurrentLinkedQueue<MessageFilter>();
+        _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
     }
 
-    public void add(MessageFilter filter)
+    public void add(MessageFilter<AMQException> filter)
     {
         _filters.add(filter);
     }
 
-    public void remove(MessageFilter filter)
+    public void remove(MessageFilter<AMQException> filter)
     {
         _filters.remove(filter);
     }
 
-    public boolean allAllow(AMQMessage msg)
+    public boolean allAllow(Filterable<AMQException> msg)
     {
-        for (MessageFilter filter : _filters)
+        for (MessageFilter<AMQException> filter : _filters)
         {
             try
             {

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Sun May 11 08:22:03 2008
@@ -30,45 +30,23 @@
 import java.util.List;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 /**
  * An expression which performs an operation on two expression values
  */
-public abstract class UnaryExpression implements Expression
+public abstract class UnaryExpression<E extends Exception> implements Expression<E>
 {
 
     private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
-    protected Expression right;
+    protected Expression<E> right;
 
-    public static Expression createNegate(Expression left)
+    public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
     {
-        return new UnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (rvalue instanceof Number)
-                {
-                    return negate((Number) rvalue);
-                }
-
-                return null;
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "-";
-            }
-        };
+        return new NegativeExpression(left);
     }
 
-    public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
+    public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
     {
 
         // Use a HashSet if there are many elements.
@@ -88,81 +66,17 @@
 
         final Collection inList = t;
 
-        return new BooleanUnaryExpression(right)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (rvalue.getClass() != String.class)
-                {
-                    return null;
-                }
-
-                if (((inList != null) && inList.contains(rvalue)) ^ not)
-                {
-                    return Boolean.TRUE;
-                }
-                else
-                {
-                    return Boolean.FALSE;
-                }
-
-            }
-
-            public String toString()
-            {
-                StringBuffer answer = new StringBuffer();
-                answer.append(right);
-                answer.append(" ");
-                answer.append(getExpressionSymbol());
-                answer.append(" ( ");
-
-                int count = 0;
-                for (Iterator i = inList.iterator(); i.hasNext();)
-                {
-                    Object o = (Object) i.next();
-                    if (count != 0)
-                    {
-                        answer.append(", ");
-                    }
-
-                    answer.append(o);
-                    count++;
-                }
-
-                answer.append(" )");
-
-                return answer.toString();
-            }
-
-            public String getExpressionSymbol()
-            {
-                if (not)
-                {
-                    return "NOT IN";
-                }
-                else
-                {
-                    return "IN";
-                }
-            }
-        };
+        return new InExpression(right, inList, not);
     }
 
-    abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
+    abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
     {
-        public BooleanUnaryExpression(Expression left)
+        public BooleanUnaryExpression(Expression<E> left)
         {
             super(left);
         }
 
-        public boolean matches(AMQMessage message) throws AMQException
+        public boolean matches(Filterable<E> message) throws E
         {
             Object object = evaluate(message);
 
@@ -171,26 +85,9 @@
     }
     ;
 
-    public static BooleanExpression createNOT(BooleanExpression left)
+    public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
     {
-        return new BooleanUnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Boolean lvalue = (Boolean) right.evaluate(message);
-                if (lvalue == null)
-                {
-                    return null;
-                }
-
-                return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "NOT";
-            }
-        };
+        return new NotExpression(left);
     }
 
     public static BooleanExpression createXPath(final String xpath)
@@ -203,36 +100,9 @@
         return new XQueryExpression(xpath);
     }
 
-    public static BooleanExpression createBooleanCast(Expression left)
+    public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
     {
-        return new BooleanUnaryExpression(left)
-        {
-            public Object evaluate(AMQMessage message) throws AMQException
-            {
-                Object rvalue = right.evaluate(message);
-                if (rvalue == null)
-                {
-                    return null;
-                }
-
-                if (!rvalue.getClass().equals(Boolean.class))
-                {
-                    return Boolean.FALSE;
-                }
-
-                return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
-            }
-
-            public String toString()
-            {
-                return right.toString();
-            }
-
-            public String getExpressionSymbol()
-            {
-                return "";
-            }
-        };
+        return new BooleanCastExpression(left);
     }
 
     private static Number negate(Number left)
@@ -281,7 +151,7 @@
         this.right = left;
     }
 
-    public Expression getRight()
+    public Expression<E> getRight()
     {
         return right;
     }
@@ -334,4 +204,166 @@
      */
     public abstract String getExpressionSymbol();
 
+    private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+    {
+        public NegativeExpression(final Expression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (rvalue instanceof Number)
+            {
+                return negate((Number) rvalue);
+            }
+
+            return null;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "-";
+        }
+    }
+
+    private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        private final Collection _inList;
+        private final boolean _not;
+
+        public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+        {
+            super(right);
+            _inList = inList;
+            _not = not;
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (rvalue.getClass() != String.class)
+            {
+                return null;
+            }
+
+            if (((_inList != null) && _inList.contains(rvalue)) ^ _not)
+            {
+                return Boolean.TRUE;
+            }
+            else
+            {
+                return Boolean.FALSE;
+            }
+
+        }
+
+        public String toString()
+        {
+            StringBuffer answer = new StringBuffer();
+            answer.append(right);
+            answer.append(" ");
+            answer.append(getExpressionSymbol());
+            answer.append(" ( ");
+
+            int count = 0;
+            for (Iterator i = _inList.iterator(); i.hasNext();)
+            {
+                Object o = (Object) i.next();
+                if (count != 0)
+                {
+                    answer.append(", ");
+                }
+
+                answer.append(o);
+                count++;
+            }
+
+            answer.append(" )");
+
+            return answer.toString();
+        }
+
+        public String getExpressionSymbol()
+        {
+            if (_not)
+            {
+                return "NOT IN";
+            }
+            else
+            {
+                return "IN";
+            }
+        }
+    }
+
+    private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        public NotExpression(final BooleanExpression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Boolean lvalue = (Boolean) right.evaluate(message);
+            if (lvalue == null)
+            {
+                return null;
+            }
+
+            return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "NOT";
+        }
+    }
+
+    private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+    {
+        public BooleanCastExpression(final Expression<E> left)
+        {
+            super(left);
+        }
+
+        public Object evaluate(Filterable<E> message) throws E
+        {
+            Object rvalue = right.evaluate(message);
+            if (rvalue == null)
+            {
+                return null;
+            }
+
+            if (!rvalue.getClass().equals(Boolean.class))
+            {
+                return Boolean.FALSE;
+            }
+
+            return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+        }
+
+        public String toString()
+        {
+            return right.toString();
+        }
+
+        public String getExpressionSymbol()
+        {
+            return "";
+        }
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Sun May 11 08:22:03 2008
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -70,7 +71,7 @@
     private final XPathEvaluator evaluator;
 
     static public interface XPathEvaluator {
-        public boolean evaluate(AMQMessage message) throws AMQException;
+        public boolean evaluate(Filterable message) throws AMQException;
     }
 
     XPathExpression(String xpath) {
@@ -92,7 +93,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException {
+    public Object evaluate(Filterable message) throws AMQException {
 //        try {
 //FIXME this is flow to disk work
 //            if( message.isDropped() )
@@ -117,7 +118,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable message) throws AMQException
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Sun May 11 08:22:03 2008
@@ -19,6 +19,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 
 //
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
@@ -35,7 +36,7 @@
         this.xpath = xpath;
     }
 
-    public Object evaluate(AMQMessage message) throws AMQException {
+    public Object evaluate(Filterable message) throws AMQException {
         return Boolean.FALSE;
     }
 
@@ -48,7 +49,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws AMQException
+    public boolean matches(Filterable message) throws AMQException
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Sun May 11 08:22:03 2008
@@ -29,6 +29,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
 import org.apache.xpath.CachedXPathAPI;
 import org.w3c.dom.Document;
 import org.w3c.dom.traversal.NodeIterator;
@@ -42,7 +43,7 @@
         this.xpath = xpath;
     }
     
-    public boolean evaluate(AMQMessage m) throws AMQException
+    public boolean evaluate(Filterable m) throws AMQException
     {
         // TODO - we would have to check the content type and then evaluate the content
         //        here... is this really a feature we wish to implement? - RobG

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java Sun May 11 08:22:03 2008
@@ -54,4 +54,9 @@
             notifyListeners(suspended);
         }
     }
+
+    protected final void notifyIncreaseBytesCredit()
+    {
+        notifyListeners(false);
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Sun May 11 08:22:03 2008
@@ -28,7 +28,7 @@
 {
     private final AtomicLong _messageCredit;
 
-    MessageOnlyCreditManager(final long initialCredit)
+    public MessageOnlyCreditManager(final long initialCredit)
     {
         _messageCredit = new AtomicLong(initialCredit);
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Sun May 11 08:22:03 2008
@@ -84,8 +84,10 @@
     public synchronized void addCredit(final long messageCredit, final long bytesCredit)
     {
         final long messageCreditLimit = _messageCreditLimit;
+        boolean notifyIncrease = true;
         if(messageCreditLimit != 0L)
         {
+            notifyIncrease = (_messageCredit != 0);
             long newCredit = _messageCredit + messageCredit;
             _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
         }
@@ -96,8 +98,14 @@
         {
             long newCredit = _bytesCredit + bytesCredit;
             _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
+            if(notifyIncrease && bytesCredit>0)
+            {
+                notifyIncreaseBytesCredit();
+            }
         }
 
+
+
         setSuspended(!hasCredit());
 
     }
@@ -138,7 +146,7 @@
                     }
                     else
                     {
-                        setSuspended(true);
+                        //setSuspended(true);
                         return false;
                     }
                 }
@@ -166,7 +174,7 @@
                 }
                 else
                 {
-                    setSuspended(true);
+                    //setSuspended(true);
                     return false;
                 }
             }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Sun May 11 08:22:03 2008
@@ -26,11 +26,21 @@
 import org.apache.qpid.framing.BasicGetBody;
 import org.apache.qpid.framing.BasicGetEmptyBody;
 import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueueImpl;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -86,7 +96,7 @@
                 //Perform ACLs
                 vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
 
-                if (!queue.performGet(session, channel, !body.getNoAck()))
+                if (!performGet(queue,session, channel, !body.getNoAck()))
                 {
                     MethodRegistry methodRegistry = session.getMethodRegistry();
                     // TODO - set clusterId
@@ -98,4 +108,80 @@
             }
         }
     }
+
+    public static boolean performGet(final AMQQueue queue,
+                                     final AMQProtocolSession session,
+                                     final AMQChannel channel,
+                                     final boolean acks)
+            throws AMQException
+    {
+
+        final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+        final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
+        {
+
+            int _msg;
+
+            public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            throws AMQException
+            {
+                singleMessageCredit.useCreditForMessage(entry.getMessage());
+                session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
+                                                                        deliveryTag, queue.getMessageCount());
+
+            }
+        };
+        final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+        {
+
+            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            {
+                channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+            }
+        };
+
+        Subscription sub;
+        if(acks)
+        {
+            sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+        }
+        else
+        {
+            sub = new GetNoAckSubscription(channel,
+                                                 session,
+                                                 null,
+                                                 null,
+                                                 false,
+                                                 singleMessageCredit,
+                                                 getDeliveryMethod,
+                                                 getRecordMethod);
+        }
+
+        queue.registerSubscription(sub,false);
+        queue.flushSubscription(sub);
+        queue.unregisterSubscription(sub);
+        return(!singleMessageCredit.hasCredit());
+
+
+    }
+
+    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+    {
+        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+                               AMQShortString consumerTag, FieldTable filters,
+                               boolean noLocal, FlowCreditManager creditManager,
+                                   ClientDeliveryMethod deliveryMethod,
+                                   RecordDeliveryMethod recordMethod)
+            throws AMQException
+        {
+            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+        }
+
+        public boolean wouldSuspend(QueueEntry msg)
+        {
+            return !getCreditManager().useCreditForMessage(msg.getMessage());
+        }
+
+    }
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun May 11 08:22:03 2008
@@ -174,7 +174,9 @@
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
         AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
-        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost);
+
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
+                                                                  body.getArguments());
 
 
         if (body.getExclusive() && !body.getDurable())

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Sun May 11 08:22:03 2008
@@ -42,7 +42,7 @@
 /**
  * A deliverable message.
  */
-public class AMQMessage
+public class AMQMessage implements Filterable<AMQException>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);

Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun May 11 08:22:03 2008
@@ -0,0 +1,41 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+
+public class AMQPriorityQueue extends SimpleAMQQueue
+{
+    protected AMQPriorityQueue(final AMQShortString name,
+                               final boolean durable,
+                               final AMQShortString owner,
+                               final boolean autoDelete,
+                               final VirtualHost virtualHost,
+                               int priorities)
+            throws AMQException
+    {
+        super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+    }
+
+    
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun May 11 08:22:03 2008
@@ -34,7 +34,7 @@
 import java.util.List;
 import java.util.Set;
 
-public interface AMQQueue extends Managable, Comparable
+public interface AMQQueue extends Managable, Comparable<AMQQueue>
 {
 
     AMQShortString getName();
@@ -147,11 +147,6 @@
 
     void deliverAsync();
 
-
-    boolean performGet(final AMQProtocolSession session, final AMQChannel channel, final boolean b) throws AMQException;
-
-
-
     void addQueueDeleteTask(final Task task);
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
@@ -160,6 +155,8 @@
 
     Set<NotificationCheck> getNotificationChecks();
 
+    void flushSubscription(final Subscription sub) throws AMQException;
+
 
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun May 11 08:22:03 2008
@@ -21,20 +21,32 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.AMQException;
 
 
 public class AMQQueueFactory
 {
+    private static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
-                                                  boolean durable,
-                                                  AMQShortString owner,
-                                                  boolean autoDelete,
-                                                  VirtualHost virtualHost)
+                                              boolean durable,
+                                              AMQShortString owner,
+                                              boolean autoDelete,
+                                              VirtualHost virtualHost, final FieldTable arguments)
             throws AMQException
     {
-        //return new AMQQueueImpl(name, durable, owner, autoDelete, virtualHost);
-        return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+
+        final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+
+        if(priorities > 1)
+        {
+            return new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+        }
+        else
+        {
+            return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+        }
     }
 }

Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sun May 11 08:22:03 2008
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+
+public interface Filterable<E extends Exception>
+{
+    ContentHeaderBody getContentHeaderBody() throws E;
+
+    boolean isPersistent() throws E;
+
+    boolean isRedelivered();
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun May 11 08:22:03 2008
@@ -38,8 +38,9 @@
 
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Collection;
 
-public class IncomingMessage
+public class IncomingMessage implements Filterable<RuntimeException>
 {
 
     /** Used for debugging purposes. */
@@ -66,7 +67,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private List<AMQQueue> _destinationQueues;
+    private Collection<AMQQueue> _destinationQueues;
 
     private AMQProtocolSession _publisher;
     private MessageStore _messageStore;
@@ -160,7 +161,7 @@
 
         // we get a reference to the destination queues now so that we can clear the
         // transient message data as quickly as possible
-        List<AMQQueue> destinationQueues = _destinationQueues;
+        Collection<AMQQueue> destinationQueues = _destinationQueues;
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Delivering message " + _messageId + " to " + destinationQueues);
@@ -175,9 +176,6 @@
             _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
                                                           _messagePublishInfo, getContentHeaderBody());
 
-            // we then allow the transactional context to do something with the message content
-            // now that it has all been received, before we attempt delivery
-            _txnContext.messageFullyReceived(isPersistent());
 
             message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
             message.setPublisherIdentifier(_publisher.getClientIdentifier());
@@ -214,6 +212,10 @@
                 }
             }
 
+            // we then allow the transactional context to do something with the message content
+            // now that it has all been received, before we attempt delivery
+            _txnContext.messageFullyReceived(isPersistent());
+
             return message;
         }
         finally
@@ -283,6 +285,11 @@
              ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 2;
     }
 
+    public boolean isRedelivered()
+    {
+        return false;
+    }
+
     public void setMessageStore(final MessageStore messageStore)
     {
         _messageStore = messageStore;
@@ -303,7 +310,7 @@
         _exchange.route(this);
     }
 
-    public void enqueue(final List<AMQQueue> queues)
+    public void enqueue(final Collection<AMQQueue> queues)
     {
         _destinationQueues = queues;
     }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Sun May 11 08:22:03 2008
@@ -126,7 +126,7 @@
      * @param age  maximum age of message.
      * @throws IOException
      */
-    @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on thr broker")
+    @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on the broker")
     void setMaximumMessageAge(Long age) throws IOException;
 
     /**

Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun May 11 08:22:03 2008
@@ -0,0 +1,164 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+public class PriorityQueueList implements QueueEntryList
+{
+    private final AMQQueue _queue;
+    private final QueueEntryList[] _priorityLists;
+    private final int _priorities;
+    private final int _priorityOffset;
+
+    public PriorityQueueList(AMQQueue queue, int priorities)
+    {
+        _queue = queue;
+        _priorityLists = new QueueEntryList[priorities];
+        _priorities = priorities;
+        _priorityOffset = 5-((priorities + 1)/2);
+        for(int i = 0; i < priorities; i++)
+        {
+            _priorityLists[i] = new SimpleQueueEntryList(queue);
+        }
+    }
+
+    public AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public QueueEntry add(AMQMessage message)
+    {
+        try
+        {
+            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+            if(index >= _priorities)
+            {
+                index = _priorities-1;
+            }
+            else if(index < 0)
+            {
+                index = 0;
+            }
+            return _priorityLists[index].add(message);
+        }
+        catch (AMQException e)
+        {
+            // TODO - fix AMQ Exception
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public QueueEntry next(QueueEntry node)
+    {
+        QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
+        QueueEntry next = nodeImpl.getNext();
+
+        if(next == null)
+        {
+            QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
+            int index;
+            for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+
+            while(next == null && index != 0)
+            {
+                index--;
+                next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+            }
+
+        }
+        return next;
+    }
+
+    private final class PriorityQueueEntryListIterator implements QueueEntryIterator
+    {
+        private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+        private QueueEntry _lastNode;
+
+        PriorityQueueEntryListIterator()
+        {
+            for(int i = 0; i < _priorityLists.length; i++)
+            {
+                _iterators[i] = _priorityLists[i].iterator();
+            }
+            _lastNode = _iterators[_iterators.length - 1].getNode();
+        }
+
+
+        public boolean atTail()
+        {
+            for(int i = 0; i < _iterators.length; i++)
+            {
+                if(!_iterators[i].atTail())
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public QueueEntry getNode()
+        {
+            return _lastNode;
+        }
+
+        public boolean advance()
+        {
+            for(int i = _iterators.length-1; i >= 0; i--)
+            {
+                if(_iterators[i].advance())
+                {
+                    _lastNode = _iterators[i].getNode();
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    public QueueEntryIterator iterator()
+    {
+        return new PriorityQueueEntryListIterator();
+    }
+
+    public QueueEntry getHead()
+    {
+        return _priorityLists[_priorities-1].getHead();
+    }
+
+    static class Factory implements QueueEntryListFactory
+    {
+        private final int _priorities;
+
+        Factory(int priorities)
+        {
+            _priorities = priorities;
+        }
+
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        {
+            return new PriorityQueueList(queue, _priorities);
+        }
+    }
+}

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun May 11 08:22:03 2008
@@ -26,6 +26,99 @@
 */
 public interface QueueEntry extends Comparable<QueueEntry>
 {
+
+
+
+    public static enum State
+    {
+        AVAILABLE,
+        ACQUIRED,
+        EXPIRED,
+        DEQUEUED
+    }
+
+    public static interface StateChangeListener
+    {
+        public void stateChanged(QueueEntry entry, State oldSate, State newState);
+    }
+
+    public abstract class EntryState
+    {
+        private EntryState()
+        {
+        }
+
+        public abstract State getState();
+    }
+
+
+    public final class AvailableState extends EntryState
+    {
+
+        public State getState()
+        {
+            return State.AVAILABLE;
+        }
+    }
+
+
+    public final class DeletedState extends EntryState
+    {
+
+        public State getState()
+        {
+            return State.DEQUEUED;
+        }
+    }
+
+    public final class ExpiredState extends EntryState
+    {
+
+        public State getState()
+        {
+            return State.EXPIRED;
+        }
+    }
+
+
+    public final class NonSubscriptionAcquiredState extends EntryState
+    {
+        public State getState()
+        {
+            return State.ACQUIRED;
+        }
+    }
+
+    public final class SubscriptionAcquiredState extends EntryState
+    {
+        private final Subscription _subscription;
+
+        public SubscriptionAcquiredState(Subscription subscription)
+        {
+            _subscription = subscription;
+        }
+
+
+        public State getState()
+        {
+            return State.ACQUIRED;
+        }
+
+        public Subscription getSubscription()
+        {
+            return _subscription;
+        }
+    }
+
+
+    final static EntryState AVAILABLE_STATE = new AvailableState();
+    final static EntryState DELETED_STATE = new DeletedState();
+    final static EntryState EXPIRED_STATE = new ExpiredState();
+    final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
+
+
+
     AMQQueue getQueue();
 
     AMQMessage getMessage();
@@ -38,8 +131,12 @@
 
     boolean isAcquired();
 
+    boolean acquire();
     boolean acquire(Subscription sub);
 
+    boolean delete();
+    boolean isDeleted();
+
     boolean acquiredBySubscription();
 
     void setDeliveredToSubscription();
@@ -71,4 +168,7 @@
     void discard(StoreContext storeContext) throws AMQException;
 
     boolean isQueueDeleted();
+
+    void addStateChangeListener(StateChangeListener listener);
+    boolean removeStateChangeListener(StateChangeListener listener);
 }

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun May 11 08:22:03 2008
@@ -27,8 +27,9 @@
 
 import java.util.Set;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 
 public class QueueEntryImpl implements QueueEntry
@@ -39,42 +40,77 @@
      */
     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
 
-    private final AMQQueue _queue;
+    private final SimpleQueueEntryList _queueEntryList;
+
     private final AMQMessage _message;
 
 
     private Set<Subscription> _rejectedBy = null;
 
-    private final AtomicReference<Object> _owner = new AtomicReference<Object>();
-    private final AtomicLong _entryId = new AtomicLong();
+    private volatile EntryState _state = AVAILABLE_STATE;
+
+    private static final
+        AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+            _stateUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueEntryImpl.class, EntryState.class, "_state");
+
+
+    private volatile Set<StateChangeListener> _stateChangeListeners;
+
+    private static final
+        AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+                _listenersUpdater =
+        AtomicReferenceFieldUpdater.newUpdater
+        (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
+
 
+    private static final
+        AtomicLongFieldUpdater<QueueEntryImpl>
+            _entryIdUpdater =
+        AtomicLongFieldUpdater.newUpdater
+        (QueueEntryImpl.class, "_entryId");
 
-    public QueueEntryImpl(AMQQueue queue, AMQMessage message, final long entryId)
+
+    private volatile long _entryId;
+
+    volatile QueueEntryImpl _next;
+
+
+    QueueEntryImpl(SimpleQueueEntryList queueEntryList)
     {
-        _queue = queue;
+        this(queueEntryList,null,Long.MIN_VALUE);
+        _state = DELETED_STATE;
+    }
+
+
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+    {
+        _queueEntryList = queueEntryList;
         _message = message;
-        _entryId.set(entryId);
+
+        _entryIdUpdater.set(this, entryId);
     }
 
-    public QueueEntryImpl(AMQQueue queue, AMQMessage message)
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
     {
-        _queue = queue;
+        _queueEntryList = queueEntryList;
         _message = message;
     }
 
     protected void setEntryId(long entryId)
     {
-        _entryId.set(entryId);
+        _entryIdUpdater.set(this, entryId);
     }
 
     protected long getEntryId()
     {
-        return _entryId.get();
+        return _entryId;
     }
 
     public AMQQueue getQueue()
     {
-        return _queue;
+        return _queueEntryList.getQueue();
     }
 
     public AMQMessage getMessage()
@@ -94,23 +130,39 @@
 
     public boolean expired() throws AMQException
     {
-        return getMessage().expired(_queue);
+        return getMessage().expired(getQueue());
     }
 
     public boolean isAcquired()
     {
-        return _owner.get() != null;
+        return _state.getState() == State.ACQUIRED;
+    }
+
+    public boolean acquire()
+    {
+        return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+    }
+
+    private boolean acquire(final EntryState state)
+    {
+        boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+        if(acquired && _stateChangeListeners != null)
+        {
+            notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+        }
+
+        return acquired;
     }
 
     public boolean acquire(Subscription sub)
     {
-        return !(_owner.compareAndSet(null, sub == null ? this : sub));
+        return acquire(sub.getOwningState());
     }
 
     public boolean acquiredBySubscription()
     {
-        Object owner = _owner.get();
-        return (owner != null) && (owner != this);
+
+        return (_state instanceof SubscriptionAcquiredState);
     }
 
     public void setDeliveredToSubscription()
@@ -120,7 +172,7 @@
 
     public void release()
     {
-        _owner.set(null);
+        _stateUpdater.set(this,AVAILABLE_STATE);
     }
 
     public String debugIdentity()
@@ -141,18 +193,16 @@
 
     public Subscription getDeliveredSubscription()
     {
-        synchronized (this)
-        {
-            Object owner = _owner.get();
-            if (owner instanceof Subscription)
+            EntryState state = _state;
+            if (state instanceof SubscriptionAcquiredState)
             {
-                return (Subscription) owner;
+                return ((SubscriptionAcquiredState) state).getSubscription();
             }
             else
             {
                 return null;
             }
-        }
+
     }
 
     public void reject()
@@ -193,25 +243,44 @@
 
     public void requeue(final StoreContext storeContext) throws AMQException
     {
-        _queue.requeue(storeContext, this);
+        getQueue().requeue(storeContext, this);
+        if(_stateChangeListeners != null)
+        {
+            notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+        }
     }
 
     public void dequeue(final StoreContext storeContext) throws FailedDequeueException
     {
+
+
         getQueue().dequeue(storeContext, this);
+        if(_stateChangeListeners != null)
+        {
+            notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+        }
+    }
+
+    private void notifyStateChange(final State oldState, final State newState)
+    {
+        for(StateChangeListener l : _stateChangeListeners)
+        {
+            l.stateChanged(this, oldState, newState);
+        }
     }
 
     public void dispose(final StoreContext storeContext) throws MessageCleanupException
     {
         getMessage().decrementReference(storeContext);
+        delete();
     }
 
     public void restoreCredit()
     {
-        Object owner = _owner.get();
-        if(owner instanceof Subscription)
+        EntryState state = _state;
+        if(state instanceof SubscriptionAcquiredState)
         {
-            Subscription s = (Subscription) owner;
+            Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
             s.restoreCredit(this);
         }
     }
@@ -232,9 +301,85 @@
         return getQueue().isDeleted();
     }
 
+    public void addStateChangeListener(StateChangeListener listener)
+    {
+        Set<StateChangeListener> listeners = _stateChangeListeners;
+        if(listeners == null)
+        {
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+            listeners = _stateChangeListeners;
+        }
+
+        listeners.add(listener);
+    }
+
+    public boolean removeStateChangeListener(StateChangeListener listener)
+    {
+        Set<StateChangeListener> listeners = _stateChangeListeners;
+        if(listeners != null)
+        {
+            return listeners.remove(listener);
+        }
+
+        return false;
+    }
+
+
     public int compareTo(final QueueEntry o)
     {
         QueueEntryImpl other = (QueueEntryImpl)o;
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
+
+    public QueueEntryImpl getNext()
+    {
+
+        QueueEntryImpl next = nextNode();
+        while(next != null && next.isDeleted())
+        {
+
+            final QueueEntryImpl newNext = next.nextNode();
+            if(newNext != null)
+            {
+                SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+                next = nextNode();
+            }
+            else
+            {
+                next = null;
+            }
+
+        }
+        return next;
+    }
+
+    QueueEntryImpl nextNode()
+    {
+        return _next;
+    }
+
+    public boolean isDeleted()
+    {
+        return _state == DELETED_STATE;
+    }
+
+    public boolean delete()
+    {
+        EntryState state = _state;
+
+        if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+        {
+            _queueEntryList.advanceHead();
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    public QueueEntryList getQueueEntryList()
+    {
+        return _queueEntryList;
+    }
 }

Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Sun May 11 08:22:03 2008
@@ -0,0 +1,30 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+public interface QueueEntryIterator
+{
+    boolean atTail();
+
+    QueueEntry getNode();
+
+    boolean advance();
+}

Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun May 11 08:22:03 2008
@@ -0,0 +1,34 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+public interface QueueEntryList
+{
+    AMQQueue getQueue();
+
+    QueueEntry add(AMQMessage message);
+
+    QueueEntry next(QueueEntry node);
+
+    QueueEntryIterator iterator();
+
+    QueueEntry getHead();
+}