You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 17:22:05 UTC
svn commit: r655323 [2/4] - in
/incubator/qpid/branches/broker-queue-refactor/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main...
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java Sun May 11 08:22:03 2008
@@ -22,71 +22,22 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* A filter performing a comparison of two objects
*/
-public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
+public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
{
- public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
+ public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
{
- return new LogicExpression(lvalue, rvalue)
- {
-
- public Object evaluate(AMQMessage message) throws AMQException
- {
-
- Boolean lv = (Boolean) left.evaluate(message);
- // Can we do an OR shortcut??
- if ((lv != null) && lv.booleanValue())
- {
- return Boolean.TRUE;
- }
-
- Boolean rv = (Boolean) right.evaluate(message);
-
- return (rv == null) ? null : rv;
- }
-
- public String getExpressionSymbol()
- {
- return "OR";
- }
- };
+ return new OrExpression(lvalue, rvalue);
}
- public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
+ public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
{
- return new LogicExpression(lvalue, rvalue)
- {
-
- public Object evaluate(AMQMessage message) throws AMQException
- {
-
- Boolean lv = (Boolean) left.evaluate(message);
-
- // Can we do an AND shortcut??
- if (lv == null)
- {
- return null;
- }
-
- if (!lv.booleanValue())
- {
- return Boolean.FALSE;
- }
-
- Boolean rv = (Boolean) right.evaluate(message);
-
- return (rv == null) ? null : rv;
- }
-
- public String getExpressionSymbol()
- {
- return "AND";
- }
- };
+ return new AndExpression(lvalue, rvalue);
}
/**
@@ -98,13 +49,74 @@
super(left, right);
}
- public abstract Object evaluate(AMQMessage message) throws AMQException;
+ public abstract Object evaluate(Filterable<E> message) throws E;
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable<E> message) throws E
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
+ private static class OrExpression<E extends Exception> extends LogicExpression<E>
+ {
+ public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ {
+ super(lvalue, rvalue);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if ((lv != null) && lv.booleanValue())
+ {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "OR";
+ }
+ }
+
+ private static class AndExpression<E extends Exception> extends LogicExpression<E>
+ {
+ public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ {
+ super(lvalue, rvalue);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ {
+ return null;
+ }
+
+ if (!lv.booleanValue())
+ {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "AND";
+ }
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Sun May 11 08:22:03 2008
@@ -22,8 +22,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
-public interface MessageFilter
+public interface MessageFilter<E extends Exception>
{
- boolean matches(AMQMessage message) throws AMQException;
+ boolean matches(Filterable<E> message) throws E;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Sun May 11 08:22:03 2008
@@ -22,7 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
public class NoConsumerFilter implements MessageFilter
{
@@ -34,7 +34,7 @@
_logger.info("Created NoConsumerFilter");
}
- public boolean matches(AMQMessage message)
+ public boolean matches(Filterable message)
{
return true;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Sun May 11 08:22:03 2008
@@ -30,12 +30,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* Represents a property expression
*/
-public class PropertyExpression implements Expression
+public class PropertyExpression<E extends Exception> implements Expression<E>
{
// Constants - defined the same as JMS
private static final int NON_PERSISTENT = 1;
@@ -44,223 +44,60 @@
private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
- private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+ private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
- static
{
- JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
{
- public Object evaluate(AMQMessage message)
+ public Object evaluate(Filterable<E> message)
{
//TODO
return null;
}
});
- JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString replyTo = _properties.getReplyTo();
-
- return (replyTo == null) ? null : replyTo.toString();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
-
- return null;
- }
-
- }
-
- });
-
- JMS_PROPERTY_EXPRESSIONS.put("JMSType", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString type = _properties.getType();
-
- return (type == null) ? null : type.toString();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
-
- return null;
- }
-
- }
- });
-
- JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
- try
- {
- int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
- if (_logger.isDebugEnabled())
- {
- _logger.debug("JMSDeliveryMode is :" + mode);
- }
-
- return mode;
- }
- catch (AMQException e)
- {
- _logger.warn(e);
- }
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new ReplyToExpression());
- return NON_PERSISTENT;
- }
- });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new TypeExpression());
- JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return (int) _properties.getPriority();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
- }
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new DeliveryModeExpression());
- return DEFAULT_PRIORITY;
- }
- });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new PriorityExpression());
- JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new MessageIDExpression());
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString messageId = _properties.getMessageId();
-
- return (messageId == null) ? null : messageId;
- }
- catch (AMQException e)
- {
- _logger.warn(e);
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new TimestampExpression());
- return null;
- }
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new CorrelationIdExpression());
- }
- });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
- JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new Expression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
{
- public Object evaluate(AMQMessage message)
- {
-
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getTimestamp();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
-
- return null;
- }
-
- }
- });
-
- JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
-
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString correlationId = _properties.getCorrelationId();
-
- return (correlationId == null) ? null : correlationId.toString();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
-
- return null;
- }
-
- }
- });
-
- JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new Expression()
- {
- public Object evaluate(AMQMessage message)
- {
-
- try
- {
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getExpiration();
- }
- catch (AMQException e)
- {
- _logger.warn(e);
-
- return null;
- }
-
- }
- });
-
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
- {
- public Object evaluate(AMQMessage message)
+ public Object evaluate(Filterable message) throws E
{
return message.isRedelivered();
}
});
-
}
private final String name;
- private final Expression jmsPropertyExpression;
+ private final Expression<E> jmsPropertyExpression;
+
+ public boolean outerTest()
+ {
+ return false;
+ }
public PropertyExpression(String name)
{
this.name = name;
- jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
+
+
+
+ jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(AMQMessage message) throws AMQException
+ public Object evaluate(Filterable<E> message) throws E
{
if (jmsPropertyExpression != null)
@@ -319,4 +156,113 @@
}
+ private static class ReplyToExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+ AMQShortString replyTo = _properties.getReplyTo();
+
+ return (replyTo == null) ? null : replyTo.toString();
+
+ }
+
+ }
+
+ private static class TypeExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+ AMQShortString type = _properties.getType();
+
+ return (type == null) ? null : type.toString();
+
+ }
+ }
+
+ private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("JMSDeliveryMode is :" + mode);
+ }
+
+ return mode;
+ }
+ }
+
+ private static class PriorityExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+
+ return (int) _properties.getPriority();
+ }
+ }
+
+ private static class MessageIDExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+ AMQShortString messageId = _properties.getMessageId();
+
+ return (messageId == null) ? null : messageId;
+
+ }
+ }
+
+ private static class TimestampExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+
+ return _properties.getTimestamp();
+ }
+ }
+
+ private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+ AMQShortString correlationId = _properties.getCorrelationId();
+
+ return (correlationId == null) ? null : correlationId.toString();
+ }
+ }
+
+ private static class ExpirationExpression<E extends Exception> implements Expression<E>
+ {
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ CommonContentHeaderProperties _properties =
+ (CommonContentHeaderProperties)
+ message.getContentHeaderBody().properties;
+
+ return _properties.getExpiration();
+
+ }
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Sun May 11 08:22:03 2008
@@ -25,32 +25,33 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
-public class SimpleFilterManager implements FilterManager
+public class SimpleFilterManager implements FilterManager<AMQException>
{
private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
- private final ConcurrentLinkedQueue<MessageFilter> _filters;
+ private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
public SimpleFilterManager()
{
_logger.debug("Creating SimpleFilterManager");
- _filters = new ConcurrentLinkedQueue<MessageFilter>();
+ _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
}
- public void add(MessageFilter filter)
+ public void add(MessageFilter<AMQException> filter)
{
_filters.add(filter);
}
- public void remove(MessageFilter filter)
+ public void remove(MessageFilter<AMQException> filter)
{
_filters.remove(filter);
}
- public boolean allAllow(AMQMessage msg)
+ public boolean allAllow(Filterable<AMQException> msg)
{
- for (MessageFilter filter : _filters)
+ for (MessageFilter<AMQException> filter : _filters)
{
try
{
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Sun May 11 08:22:03 2008
@@ -30,45 +30,23 @@
import java.util.List;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
/**
* An expression which performs an operation on two expression values
*/
-public abstract class UnaryExpression implements Expression
+public abstract class UnaryExpression<E extends Exception> implements Expression<E>
{
private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
- protected Expression right;
+ protected Expression<E> right;
- public static Expression createNegate(Expression left)
+ public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
{
- return new UnaryExpression(left)
- {
- public Object evaluate(AMQMessage message) throws AMQException
- {
- Object rvalue = right.evaluate(message);
- if (rvalue == null)
- {
- return null;
- }
-
- if (rvalue instanceof Number)
- {
- return negate((Number) rvalue);
- }
-
- return null;
- }
-
- public String getExpressionSymbol()
- {
- return "-";
- }
- };
+ return new NegativeExpression(left);
}
- public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
+ public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
{
// Use a HashSet if there are many elements.
@@ -88,81 +66,17 @@
final Collection inList = t;
- return new BooleanUnaryExpression(right)
- {
- public Object evaluate(AMQMessage message) throws AMQException
- {
-
- Object rvalue = right.evaluate(message);
- if (rvalue == null)
- {
- return null;
- }
-
- if (rvalue.getClass() != String.class)
- {
- return null;
- }
-
- if (((inList != null) && inList.contains(rvalue)) ^ not)
- {
- return Boolean.TRUE;
- }
- else
- {
- return Boolean.FALSE;
- }
-
- }
-
- public String toString()
- {
- StringBuffer answer = new StringBuffer();
- answer.append(right);
- answer.append(" ");
- answer.append(getExpressionSymbol());
- answer.append(" ( ");
-
- int count = 0;
- for (Iterator i = inList.iterator(); i.hasNext();)
- {
- Object o = (Object) i.next();
- if (count != 0)
- {
- answer.append(", ");
- }
-
- answer.append(o);
- count++;
- }
-
- answer.append(" )");
-
- return answer.toString();
- }
-
- public String getExpressionSymbol()
- {
- if (not)
- {
- return "NOT IN";
- }
- else
- {
- return "IN";
- }
- }
- };
+ return new InExpression(right, inList, not);
}
- abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
+ abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
{
- public BooleanUnaryExpression(Expression left)
+ public BooleanUnaryExpression(Expression<E> left)
{
super(left);
}
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable<E> message) throws E
{
Object object = evaluate(message);
@@ -171,26 +85,9 @@
}
;
- public static BooleanExpression createNOT(BooleanExpression left)
+ public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
{
- return new BooleanUnaryExpression(left)
- {
- public Object evaluate(AMQMessage message) throws AMQException
- {
- Boolean lvalue = (Boolean) right.evaluate(message);
- if (lvalue == null)
- {
- return null;
- }
-
- return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
- }
-
- public String getExpressionSymbol()
- {
- return "NOT";
- }
- };
+ return new NotExpression(left);
}
public static BooleanExpression createXPath(final String xpath)
@@ -203,36 +100,9 @@
return new XQueryExpression(xpath);
}
- public static BooleanExpression createBooleanCast(Expression left)
+ public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
{
- return new BooleanUnaryExpression(left)
- {
- public Object evaluate(AMQMessage message) throws AMQException
- {
- Object rvalue = right.evaluate(message);
- if (rvalue == null)
- {
- return null;
- }
-
- if (!rvalue.getClass().equals(Boolean.class))
- {
- return Boolean.FALSE;
- }
-
- return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
- }
-
- public String toString()
- {
- return right.toString();
- }
-
- public String getExpressionSymbol()
- {
- return "";
- }
- };
+ return new BooleanCastExpression(left);
}
private static Number negate(Number left)
@@ -281,7 +151,7 @@
this.right = left;
}
- public Expression getRight()
+ public Expression<E> getRight()
{
return right;
}
@@ -334,4 +204,166 @@
*/
public abstract String getExpressionSymbol();
+ private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+ {
+ public NegativeExpression(final Expression<E> left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (rvalue instanceof Number)
+ {
+ return negate((Number) rvalue);
+ }
+
+ return null;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "-";
+ }
+ }
+
+ private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ {
+ private final Collection _inList;
+ private final boolean _not;
+
+ public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+ {
+ super(right);
+ _inList = inList;
+ _not = not;
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (rvalue.getClass() != String.class)
+ {
+ return null;
+ }
+
+ if (((_inList != null) && _inList.contains(rvalue)) ^ _not)
+ {
+ return Boolean.TRUE;
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+
+ }
+
+ public String toString()
+ {
+ StringBuffer answer = new StringBuffer();
+ answer.append(right);
+ answer.append(" ");
+ answer.append(getExpressionSymbol());
+ answer.append(" ( ");
+
+ int count = 0;
+ for (Iterator i = _inList.iterator(); i.hasNext();)
+ {
+ Object o = (Object) i.next();
+ if (count != 0)
+ {
+ answer.append(", ");
+ }
+
+ answer.append(o);
+ count++;
+ }
+
+ answer.append(" )");
+
+ return answer.toString();
+ }
+
+ public String getExpressionSymbol()
+ {
+ if (_not)
+ {
+ return "NOT IN";
+ }
+ else
+ {
+ return "IN";
+ }
+ }
+ }
+
+ private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ {
+ public NotExpression(final BooleanExpression<E> left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ Boolean lvalue = (Boolean) right.evaluate(message);
+ if (lvalue == null)
+ {
+ return null;
+ }
+
+ return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "NOT";
+ }
+ }
+
+ private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ {
+ public BooleanCastExpression(final Expression<E> left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable<E> message) throws E
+ {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (!rvalue.getClass().equals(Boolean.class))
+ {
+ return Boolean.FALSE;
+ }
+
+ return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public String toString()
+ {
+ return right.toString();
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "";
+ }
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Sun May 11 08:22:03 2008
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -70,7 +71,7 @@
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(AMQMessage message) throws AMQException;
+ public boolean evaluate(Filterable message) throws AMQException;
}
XPathExpression(String xpath) {
@@ -92,7 +93,7 @@
}
}
- public Object evaluate(AMQMessage message) throws AMQException {
+ public Object evaluate(Filterable message) throws AMQException {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -117,7 +118,7 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable message) throws AMQException
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Sun May 11 08:22:03 2008
@@ -19,6 +19,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
//
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
@@ -35,7 +36,7 @@
this.xpath = xpath;
}
- public Object evaluate(AMQMessage message) throws AMQException {
+ public Object evaluate(Filterable message) throws AMQException {
return Boolean.FALSE;
}
@@ -48,7 +49,7 @@
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(AMQMessage message) throws AMQException
+ public boolean matches(Filterable message) throws AMQException
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Sun May 11 08:22:03 2008
@@ -29,6 +29,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.Filterable;
import org.apache.xpath.CachedXPathAPI;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;
@@ -42,7 +43,7 @@
this.xpath = xpath;
}
- public boolean evaluate(AMQMessage m) throws AMQException
+ public boolean evaluate(Filterable m) throws AMQException
{
// TODO - we would have to check the content type and then evaluate the content
// here... is this really a feature we wish to implement? - RobG
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java Sun May 11 08:22:03 2008
@@ -54,4 +54,9 @@
notifyListeners(suspended);
}
}
+
+ protected final void notifyIncreaseBytesCredit()
+ {
+ notifyListeners(false);
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Sun May 11 08:22:03 2008
@@ -28,7 +28,7 @@
{
private final AtomicLong _messageCredit;
- MessageOnlyCreditManager(final long initialCredit)
+ public MessageOnlyCreditManager(final long initialCredit)
{
_messageCredit = new AtomicLong(initialCredit);
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Sun May 11 08:22:03 2008
@@ -84,8 +84,10 @@
public synchronized void addCredit(final long messageCredit, final long bytesCredit)
{
final long messageCreditLimit = _messageCreditLimit;
+ boolean notifyIncrease = true;
if(messageCreditLimit != 0L)
{
+ notifyIncrease = (_messageCredit != 0);
long newCredit = _messageCredit + messageCredit;
_messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
}
@@ -96,8 +98,14 @@
{
long newCredit = _bytesCredit + bytesCredit;
_bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
+ if(notifyIncrease && bytesCredit>0)
+ {
+ notifyIncreaseBytesCredit();
+ }
}
+
+
setSuspended(!hasCredit());
}
@@ -138,7 +146,7 @@
}
else
{
- setSuspended(true);
+ //setSuspended(true);
return false;
}
}
@@ -166,7 +174,7 @@
}
else
{
- setSuspended(true);
+ //setSuspended(true);
return false;
}
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Sun May 11 08:22:03 2008
@@ -26,11 +26,21 @@
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
+import org.apache.qpid.server.subscription.ClientDeliveryMethod;
+import org.apache.qpid.server.subscription.RecordDeliveryMethod;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueueImpl;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -86,7 +96,7 @@
//Perform ACLs
vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
- if (!queue.performGet(session, channel, !body.getNoAck()))
+ if (!performGet(queue,session, channel, !body.getNoAck()))
{
MethodRegistry methodRegistry = session.getMethodRegistry();
// TODO - set clusterId
@@ -98,4 +108,80 @@
}
}
}
+
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolSession session,
+ final AMQChannel channel,
+ final boolean acks)
+ throws AMQException
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
+ {
+
+ int _msg;
+
+ public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ throws AMQException
+ {
+ singleMessageCredit.useCreditForMessage(entry.getMessage());
+ session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+
+ }
+ };
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ Subscription sub;
+ if(acks)
+ {
+ sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ sub = new GetNoAckSubscription(channel,
+ session,
+ null,
+ null,
+ false,
+ singleMessageCredit,
+ getDeliveryMethod,
+ getRecordMethod);
+ }
+
+ queue.registerSubscription(sub,false);
+ queue.flushSubscription(sub);
+ queue.unregisterSubscription(sub);
+ return(!singleMessageCredit.hasCredit());
+
+
+ }
+
+ public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
+ {
+ public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
+ throws AMQException
+ {
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
+ }
+
+ public boolean wouldSuspend(QueueEntry msg)
+ {
+ return !getCreditManager().useCreditForMessage(msg.getMessage());
+ }
+
+ }
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun May 11 08:22:03 2008
@@ -174,7 +174,9 @@
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost);
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), virtualHost,
+ body.getArguments());
if (body.getExclusive() && !body.getDurable())
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Sun May 11 08:22:03 2008
@@ -42,7 +42,7 @@
/**
* A deliverable message.
*/
-public class AMQMessage
+public class AMQMessage implements Filterable<AMQException>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun May 11 08:22:03 2008
@@ -0,0 +1,41 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+
+public class AMQPriorityQueue extends SimpleAMQQueue
+{
+ protected AMQPriorityQueue(final AMQShortString name,
+ final boolean durable,
+ final AMQShortString owner,
+ final boolean autoDelete,
+ final VirtualHost virtualHost,
+ int priorities)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+ }
+
+
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun May 11 08:22:03 2008
@@ -34,7 +34,7 @@
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Managable, Comparable
+public interface AMQQueue extends Managable, Comparable<AMQQueue>
{
AMQShortString getName();
@@ -147,11 +147,6 @@
void deliverAsync();
-
- boolean performGet(final AMQProtocolSession session, final AMQChannel channel, final boolean b) throws AMQException;
-
-
-
void addQueueDeleteTask(final Task task);
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
@@ -160,6 +155,8 @@
Set<NotificationCheck> getNotificationChecks();
+ void flushSubscription(final Subscription sub) throws AMQException;
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun May 11 08:22:03 2008
@@ -21,20 +21,32 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
public class AMQQueueFactory
{
+ private static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- VirtualHost virtualHost)
+ boolean durable,
+ AMQShortString owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, final FieldTable arguments)
throws AMQException
{
- //return new AMQQueueImpl(name, durable, owner, autoDelete, virtualHost);
- return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+
+ final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+
+ if(priorities > 1)
+ {
+ return new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+ }
+ else
+ {
+ return new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ }
}
}
Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sun May 11 08:22:03 2008
@@ -0,0 +1,33 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+
+public interface Filterable<E extends Exception>
+{
+ ContentHeaderBody getContentHeaderBody() throws E;
+
+ boolean isPersistent() throws E;
+
+ boolean isRedelivered();
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun May 11 08:22:03 2008
@@ -38,8 +38,9 @@
import java.util.List;
import java.util.ArrayList;
+import java.util.Collection;
-public class IncomingMessage
+public class IncomingMessage implements Filterable<RuntimeException>
{
/** Used for debugging purposes. */
@@ -66,7 +67,7 @@
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private List<AMQQueue> _destinationQueues;
+ private Collection<AMQQueue> _destinationQueues;
private AMQProtocolSession _publisher;
private MessageStore _messageStore;
@@ -160,7 +161,7 @@
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
- List<AMQQueue> destinationQueues = _destinationQueues;
+ Collection<AMQQueue> destinationQueues = _destinationQueues;
if (_logger.isDebugEnabled())
{
_logger.debug("Delivering message " + _messageId + " to " + destinationQueues);
@@ -175,9 +176,6 @@
_messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
_messagePublishInfo, getContentHeaderBody());
- // we then allow the transactional context to do something with the message content
- // now that it has all been received, before we attempt delivery
- _txnContext.messageFullyReceived(isPersistent());
message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
message.setPublisherIdentifier(_publisher.getClientIdentifier());
@@ -214,6 +212,10 @@
}
}
+ // we then allow the transactional context to do something with the message content
+ // now that it has all been received, before we attempt delivery
+ _txnContext.messageFullyReceived(isPersistent());
+
return message;
}
finally
@@ -283,6 +285,11 @@
((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 2;
}
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+
public void setMessageStore(final MessageStore messageStore)
{
_messageStore = messageStore;
@@ -303,7 +310,7 @@
_exchange.route(this);
}
- public void enqueue(final List<AMQQueue> queues)
+ public void enqueue(final Collection<AMQQueue> queues)
{
_destinationQueues = queues;
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Sun May 11 08:22:03 2008
@@ -126,7 +126,7 @@
* @param age maximum age of message.
* @throws IOException
*/
- @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on thr broker")
+ @MBeanAttribute(name="MaximumMessageAge", description="Threshold high value for message age on the broker")
void setMaximumMessageAge(Long age) throws IOException;
/**
Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun May 11 08:22:03 2008
@@ -0,0 +1,164 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+public class PriorityQueueList implements QueueEntryList
+{
+ private final AMQQueue _queue;
+ private final QueueEntryList[] _priorityLists;
+ private final int _priorities;
+ private final int _priorityOffset;
+
+ public PriorityQueueList(AMQQueue queue, int priorities)
+ {
+ _queue = queue;
+ _priorityLists = new QueueEntryList[priorities];
+ _priorities = priorities;
+ _priorityOffset = 5-((priorities + 1)/2);
+ for(int i = 0; i < priorities; i++)
+ {
+ _priorityLists[i] = new SimpleQueueEntryList(queue);
+ }
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public QueueEntry add(AMQMessage message)
+ {
+ try
+ {
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ if(index >= _priorities)
+ {
+ index = _priorities-1;
+ }
+ else if(index < 0)
+ {
+ index = 0;
+ }
+ return _priorityLists[index].add(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO - fix AMQ Exception
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public QueueEntry next(QueueEntry node)
+ {
+ QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
+ QueueEntry next = nodeImpl.getNext();
+
+ if(next == null)
+ {
+ QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
+ int index;
+ for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+
+ while(next == null && index != 0)
+ {
+ index--;
+ next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+ }
+
+ }
+ return next;
+ }
+
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator
+ {
+ private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private QueueEntry _lastNode;
+
+ PriorityQueueEntryListIterator()
+ {
+ for(int i = 0; i < _priorityLists.length; i++)
+ {
+ _iterators[i] = _priorityLists[i].iterator();
+ }
+ _lastNode = _iterators[_iterators.length - 1].getNode();
+ }
+
+
+ public boolean atTail()
+ {
+ for(int i = 0; i < _iterators.length; i++)
+ {
+ if(!_iterators[i].atTail())
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public QueueEntry getNode()
+ {
+ return _lastNode;
+ }
+
+ public boolean advance()
+ {
+ for(int i = _iterators.length-1; i >= 0; i--)
+ {
+ if(_iterators[i].advance())
+ {
+ _lastNode = _iterators[i].getNode();
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public QueueEntryIterator iterator()
+ {
+ return new PriorityQueueEntryListIterator();
+ }
+
+ public QueueEntry getHead()
+ {
+ return _priorityLists[_priorities-1].getHead();
+ }
+
+ static class Factory implements QueueEntryListFactory
+ {
+ private final int _priorities;
+
+ Factory(int priorities)
+ {
+ _priorities = priorities;
+ }
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ return new PriorityQueueList(queue, _priorities);
+ }
+ }
+}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun May 11 08:22:03 2008
@@ -26,6 +26,99 @@
*/
public interface QueueEntry extends Comparable<QueueEntry>
{
+
+
+
+ public static enum State
+ {
+ AVAILABLE,
+ ACQUIRED,
+ EXPIRED,
+ DEQUEUED
+ }
+
+ public static interface StateChangeListener
+ {
+ public void stateChanged(QueueEntry entry, State oldSate, State newState);
+ }
+
+ public abstract class EntryState
+ {
+ private EntryState()
+ {
+ }
+
+ public abstract State getState();
+ }
+
+
+ public final class AvailableState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+ }
+
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DEQUEUED;
+ }
+ }
+
+ public final class ExpiredState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.EXPIRED;
+ }
+ }
+
+
+ public final class NonSubscriptionAcquiredState extends EntryState
+ {
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+ }
+
+ public final class SubscriptionAcquiredState extends EntryState
+ {
+ private final Subscription _subscription;
+
+ public SubscriptionAcquiredState(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+
+ public State getState()
+ {
+ return State.ACQUIRED;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+ }
+
+
+ final static EntryState AVAILABLE_STATE = new AvailableState();
+ final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState EXPIRED_STATE = new ExpiredState();
+ final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
+
+
+
AMQQueue getQueue();
AMQMessage getMessage();
@@ -38,8 +131,12 @@
boolean isAcquired();
+ boolean acquire();
boolean acquire(Subscription sub);
+ boolean delete();
+ boolean isDeleted();
+
boolean acquiredBySubscription();
void setDeliveredToSubscription();
@@ -71,4 +168,7 @@
void discard(StoreContext storeContext) throws AMQException;
boolean isQueueDeleted();
+
+ void addStateChangeListener(StateChangeListener listener);
+ boolean removeStateChangeListener(StateChangeListener listener);
}
Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=655323&r1=655322&r2=655323&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun May 11 08:22:03 2008
@@ -27,8 +27,9 @@
import java.util.Set;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.CopyOnWriteArraySet;
public class QueueEntryImpl implements QueueEntry
@@ -39,42 +40,77 @@
*/
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final AMQQueue _queue;
+ private final SimpleQueueEntryList _queueEntryList;
+
private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
- private final AtomicReference<Object> _owner = new AtomicReference<Object>();
- private final AtomicLong _entryId = new AtomicLong();
+ private volatile EntryState _state = AVAILABLE_STATE;
+
+ private static final
+ AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+ _stateUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, EntryState.class, "_state");
+
+
+ private volatile Set<StateChangeListener> _stateChangeListeners;
+
+ private static final
+ AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ _listenersUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
+
+ private static final
+ AtomicLongFieldUpdater<QueueEntryImpl>
+ _entryIdUpdater =
+ AtomicLongFieldUpdater.newUpdater
+ (QueueEntryImpl.class, "_entryId");
- public QueueEntryImpl(AMQQueue queue, AMQMessage message, final long entryId)
+
+ private volatile long _entryId;
+
+ volatile QueueEntryImpl _next;
+
+
+ QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
- _queue = queue;
+ this(queueEntryList,null,Long.MIN_VALUE);
+ _state = DELETED_STATE;
+ }
+
+
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ {
+ _queueEntryList = queueEntryList;
_message = message;
- _entryId.set(entryId);
+
+ _entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(AMQQueue queue, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
- _queue = queue;
+ _queueEntryList = queueEntryList;
_message = message;
}
protected void setEntryId(long entryId)
{
- _entryId.set(entryId);
+ _entryIdUpdater.set(this, entryId);
}
protected long getEntryId()
{
- return _entryId.get();
+ return _entryId;
}
public AMQQueue getQueue()
{
- return _queue;
+ return _queueEntryList.getQueue();
}
public AMQMessage getMessage()
@@ -94,23 +130,39 @@
public boolean expired() throws AMQException
{
- return getMessage().expired(_queue);
+ return getMessage().expired(getQueue());
}
public boolean isAcquired()
{
- return _owner.get() != null;
+ return _state.getState() == State.ACQUIRED;
+ }
+
+ public boolean acquire()
+ {
+ return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ }
+
+ private boolean acquire(final EntryState state)
+ {
+ boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+ if(acquired && _stateChangeListeners != null)
+ {
+ notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+ }
+
+ return acquired;
}
public boolean acquire(Subscription sub)
{
- return !(_owner.compareAndSet(null, sub == null ? this : sub));
+ return acquire(sub.getOwningState());
}
public boolean acquiredBySubscription()
{
- Object owner = _owner.get();
- return (owner != null) && (owner != this);
+
+ return (_state instanceof SubscriptionAcquiredState);
}
public void setDeliveredToSubscription()
@@ -120,7 +172,7 @@
public void release()
{
- _owner.set(null);
+ _stateUpdater.set(this,AVAILABLE_STATE);
}
public String debugIdentity()
@@ -141,18 +193,16 @@
public Subscription getDeliveredSubscription()
{
- synchronized (this)
- {
- Object owner = _owner.get();
- if (owner instanceof Subscription)
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
{
- return (Subscription) owner;
+ return ((SubscriptionAcquiredState) state).getSubscription();
}
else
{
return null;
}
- }
+
}
public void reject()
@@ -193,25 +243,44 @@
public void requeue(final StoreContext storeContext) throws AMQException
{
- _queue.requeue(storeContext, this);
+ getQueue().requeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
}
public void dequeue(final StoreContext storeContext) throws FailedDequeueException
{
+
+
getQueue().dequeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+ }
+ }
+
+ private void notifyStateChange(final State oldState, final State newState)
+ {
+ for(StateChangeListener l : _stateChangeListeners)
+ {
+ l.stateChanged(this, oldState, newState);
+ }
}
public void dispose(final StoreContext storeContext) throws MessageCleanupException
{
getMessage().decrementReference(storeContext);
+ delete();
}
public void restoreCredit()
{
- Object owner = _owner.get();
- if(owner instanceof Subscription)
+ EntryState state = _state;
+ if(state instanceof SubscriptionAcquiredState)
{
- Subscription s = (Subscription) owner;
+ Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
s.restoreCredit(this);
}
}
@@ -232,9 +301,85 @@
return getQueue().isDeleted();
}
+ public void addStateChangeListener(StateChangeListener listener)
+ {
+ Set<StateChangeListener> listeners = _stateChangeListeners;
+ if(listeners == null)
+ {
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ listeners = _stateChangeListeners;
+ }
+
+ listeners.add(listener);
+ }
+
+ public boolean removeStateChangeListener(StateChangeListener listener)
+ {
+ Set<StateChangeListener> listeners = _stateChangeListeners;
+ if(listeners != null)
+ {
+ return listeners.remove(listener);
+ }
+
+ return false;
+ }
+
+
public int compareTo(final QueueEntry o)
{
QueueEntryImpl other = (QueueEntryImpl)o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
+
+ public QueueEntryImpl getNext()
+ {
+
+ QueueEntryImpl next = nextNode();
+ while(next != null && next.isDeleted())
+ {
+
+ final QueueEntryImpl newNext = next.nextNode();
+ if(newNext != null)
+ {
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ next = nextNode();
+ }
+ else
+ {
+ next = null;
+ }
+
+ }
+ return next;
+ }
+
+ QueueEntryImpl nextNode()
+ {
+ return _next;
+ }
+
+ public boolean isDeleted()
+ {
+ return _state == DELETED_STATE;
+ }
+
+ public boolean delete()
+ {
+ EntryState state = _state;
+
+ if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+ {
+ _queueEntryList.advanceHead();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _queueEntryList;
+ }
}
Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Sun May 11 08:22:03 2008
@@ -0,0 +1,30 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+public interface QueueEntryIterator
+{
+ boolean atTail();
+
+ QueueEntry getNode();
+
+ boolean advance();
+}
Added: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=655323&view=auto
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (added)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun May 11 08:22:03 2008
@@ -0,0 +1,34 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+public interface QueueEntryList
+{
+ AMQQueue getQueue();
+
+ QueueEntry add(AMQMessage message);
+
+ QueueEntry next(QueueEntry node);
+
+ QueueEntryIterator iterator();
+
+ QueueEntry getHead();
+}