You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/05 17:12:51 UTC
svn commit: r562885 [1/2] - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient:
./ jms/ jms/filter/
Author: arnaudsimon
Date: Sun Aug 5 08:12:50 2007
New Revision: 562885
URL: http://svn.apache.org/viewvc?view=rev&rev=562885
Log:
Added message selector
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/UnaryExpression.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?view=diff&rev=562885&r1=562884&r2=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Sun Aug 5 08:12:50 2007
@@ -33,8 +33,16 @@
*/
public interface Session
{
+ public static final short ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short CONFIRM_MODE_REQUIRED = 1;
+ public static final short CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
+ public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
+ public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
+ public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
- //------------------------------------------------------
+ //------------------------------------------------------
// Session housekeeping methods
//------------------------------------------------------
/**
@@ -154,19 +162,27 @@
* <ul>
* <li> NO_LOCAL
* <li> EXCLUSIVE
- * <li> NO_ACQUIRE
- * <li> CONFIRM
* </ul>
* <p> In the absence of a particular option, defaul values are:
* <ul>
* <li> NO_LOCAL = false
* <li> EXCLUSIVE = false
- * <li> PRE-ACCQUIRE
- * <li> CONFIRM = false
* </ul>
*
* @param queue The queue this receiver is receiving messages from.
* @param destination The destination for the subscriber ,a.k.a the delivery tag.
+ * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired
+ * <p/>
+ * <li> pre-acquire (1): the message is acquired when the transfer starts
+ * </ul>
* @param listener The listener for this destination. When big message are transfered then
* it is recommended to use a {@link MessagePartListener}.
* @param options Set of Options.
@@ -174,8 +190,9 @@
* on the providers implementation.
* @throws QpidException If the session fails to create the receiver due to some error.
*/
- public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter,
- Option... options) throws QpidException;
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
+ MessagePartListener listener, Map<String, ?> filter, Option... options)
+ throws QpidException;
/**
* This method cancels a consumer. This does not affect already delivered messages, but it does
@@ -262,7 +279,7 @@
* further credit is received.
*
* @param destination The destination to stop.
- * @throws QpidException If stopping fails due to some error.
+ * @throws QpidException If stopping fails due to some error.
*/
public void messageStop(String destination) throws QpidException;
@@ -274,7 +291,7 @@
* @param range Range of acknowledged messages.
* @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(Range... range) throws QpidException;
+ public void messageAcknowledge(Range<Long>... range) throws QpidException;
/**
* Reject ranges of acquired messages.
@@ -284,7 +301,7 @@
* @param range Range of rejected messages.
* @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(Range... range) throws QpidException;
+ public void messageReject(Range<Long>... range) throws QpidException;
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -298,7 +315,7 @@
* @return Ranges of explicitly acquired messages.
* @throws QpidException If this message cannot be acquired dus to some error
*/
- public Range[] messageAcquire(Range... range) throws QpidException;
+ public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException;
/**
* Give up responsibility for processing ranges of messages.
@@ -307,7 +324,7 @@
* @param range Ranges of messages to be released.
* @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(Range... range) throws QpidException;
+ public void messageRelease(Range<Long>... range) throws QpidException;
// -----------------------------------------------
// Local transaction methods
@@ -359,8 +376,8 @@
* @throws QpidException If the session fails to declare the queue due to some error.
* @see Option
*/
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
- Option... options) throws QpidException;
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options)
+ throws QpidException;
/**
* Bind a queue with an exchange.
@@ -370,8 +387,8 @@
* @param routingKey The routing key.
* @throws QpidException If the session fails to bind the queue due to some error.
*/
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
- QpidException;
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+ throws QpidException;
/**
* Unbind a queue from an exchange.
@@ -381,8 +398,8 @@
* @param routingKey The routing key.
* @throws QpidException If the session fails to unbind the queue due to some error.
*/
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
- QpidException;
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments)
+ throws QpidException;
/**
* Purge a queue. i.e. delete all enqueued messages
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=562885&r1=562884&r2=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Sun Aug 5 08:12:50 2007
@@ -20,6 +20,8 @@
//import org.apache.qpid.nclient.api.MessageReceiver;
import org.apache.qpid.nclient.jms.message.QpidMessage;
+import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter;
+import org.apache.qpid.nclient.jms.filter.MessageFilter;
import org.apache.qpid.nclient.impl.MessagePartListenerAdapter;
import org.apache.qpid.nclient.MessagePartListener;
import org.apache.qpidity.Range;
@@ -33,7 +35,8 @@
*/
public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
- public static final short MESSAGE_FLOW_MODE = 0; // we use message flow mode
+ // we can receive up to 100 messages for an asynchronous listener
+ public static final int MAX_MESSAGE_TRANSFERRED = 100;
/**
* This MessageConsumer's messageselector.
@@ -41,6 +44,11 @@
private String _messageSelector = null;
/**
+ * The message selector filter associated with this consumer message selector
+ */
+ private MessageFilter _filter = null;
+
+ /**
* NoLocal
* If true, and the destination is a topic then inhibits the delivery of messages published
* by its own connection. The behavior for NoLocal is not specified if the destination is a queue.
@@ -53,6 +61,11 @@
protected String _subscriptionName;
/**
+ * Indicates whether this consumer receives pre-acquired messages
+ */
+ private boolean _preAcquire = true;
+
+ /**
* A MessagePartListener set up for this consumer.
*/
private MessageListener _messageListener;
@@ -71,7 +84,17 @@
* Indicates that this consumer is receiving a synch message
*/
private boolean _isReceiving = false;
-
+
+ /**
+ * Indicates that a nowait is receiving a message.
+ */
+ private boolean _isNoWaitIsReceiving = false;
+
+ /**
+ * Number of mesages received asynchronously
+ * Nether exceed MAX_MESSAGE_TRANSFERRED
+ */
+ private int _messageAsyncrhonouslyReceived = 0;
//----- Constructors
/**
@@ -89,7 +112,11 @@
boolean noLocal, String subscriptionName) throws Exception
{
super(session, destination);
- _messageSelector = messageSelector;
+ if (messageSelector != null)
+ {
+ _messageSelector = messageSelector;
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
_noLocal = noLocal;
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
@@ -102,24 +129,37 @@
* asynchronous or directly to this consumer when it is synchronously accessed.
*/
MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
- // we use the default options: EXCLUSIVE = false, PRE-ACCQUIRE and CONFIRM = off
- if (_noLocal)
- {
- getSession().getQpidSession()
- .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null,
- Option.NO_LOCAL);
- }
- else
+ getSession().getQpidSession()
+ .messageSubscribe(destination.getName(), getMessageActorID(),
+ org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED,
+ // When the message selctor is set we do not acquire the messages
+ _messageSelector != null ? org.apache.qpid.nclient.Session.ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE,
+ messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION);
+ if (_messageSelector != null)
{
- getSession().getQpidSession()
- .messageSubscribe(destination.getName(), getMessageActorID(), messageAssembler, null);
+ _preAcquire = false;
}
}
else
{
// this is a topic we need to create a temporary queue for this consumer
// unless this is a durable subscriber
+ if (subscriptionName != null)
+ {
+ // this ia a durable subscriber
+ // create a persistent queue for this subscriber
+ // getSession().getQpidSession().queueDeclare(destination.getName());
+ }
+ else
+ {
+ // this is a non durable subscriber
+ // create a temporary queue
+
+ }
}
+ // set the flow mode
+ getSession().getQpidSession()
+ .messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT);
}
//----- Message consumer API
@@ -168,7 +208,35 @@
// this method is synchronized as onMessage also access _messagelistener
// onMessage, getMessageListener and this method are the only synchronized methods
checkNotClosed();
- _messageListener = messageListener;
+ try
+ {
+ _messageListener = messageListener;
+ if (messageListener != null)
+ {
+ resetAsynchMessageReceived();
+ }
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Contact the broker and ask for the delivery of MAX_MESSAGE_TRANSFERRED messages
+ *
+ * @throws QpidException If there is a communication error
+ */
+ private void resetAsynchMessageReceived() throws QpidException
+ {
+ if (!_isStopped && _messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
+ {
+ getSession().getQpidSession().messageStop(getMessageActorID());
+ }
+ _messageAsyncrhonouslyReceived = 0;
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ MAX_MESSAGE_TRANSFERRED);
}
/**
@@ -265,7 +333,8 @@
if (!_isStopped)
{
// if this consumer is stopped then this will be call when starting
- getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
received = getSession().getQpidSession().messageFlush(getMessageActorID());
}
if (!received && timeout < 0)
@@ -275,6 +344,11 @@
}
else
{
+ // right we need to let onMessage know that a nowait is potentially waiting for a message
+ if (timeout < 0)
+ {
+ _isNoWaitIsReceiving = true;
+ }
while (_incomingMessage == null && !_isClosed)
{
try
@@ -295,9 +369,10 @@
getSession().acknowledgeMessage(_incomingMessage);
}
_incomingMessage = null;
- // We now release any message received for this consumer
- _isReceiving = false;
}
+ // We now release any message received for this consumer
+ _isReceiving = false;
+ _isNoWaitIsReceiving = false;
}
return result;
}
@@ -329,7 +404,8 @@
{
// there is a synch call waiting for a message to be delivered
// so tell the broker to deliver a message
- getSession().getQpidSession().messageFlow(getMessageActorID(), MESSAGE_FLOW_MODE, 1);
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
}
}
@@ -344,64 +420,109 @@
{
try
{
+ // if there is a message selector then we need to evaluate it.
+ boolean messageOk = true;
+ if (_messageSelector != null)
+ {
+ messageOk = _filter.matches(message.getJMSMessage());
+ }
+ // right now we need to acquire this message if needed
+ if (!_preAcquire && messageOk)
+ {
+ messageOk = acquireMessage(message);
+ }
+
// if this consumer is synchronous then set the current message and
// notify the waiting thread
if (_messageListener == null)
{
synchronized (_incomingMessageLock)
{
- if (_isReceiving)
+ if (messageOk)
{
- _incomingMessage = message;
- _incomingMessageLock.notify();
+ // we have received a proper message that we can deliver
+ if (_isReceiving)
+ {
+ _incomingMessage = message;
+ _incomingMessageLock.notify();
+ }
+ else
+ {
+ // this message has been received after a received as returned
+ // we need to release it
+ releaseMessage(message);
+ }
}
else
{
- // this message has been received after a received as returned
- // we need to release it
- releaseMessage(message);
+ // oups the message did not match the selector or we did not manage to acquire it
+ // If the receiver is still waiting for a message
+ // then we need to request a new one from the server
+ if (_isReceiving)
+ {
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(),
+ org.apache.qpid.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ boolean received = getSession().getQpidSession().messageFlush(getMessageActorID());
+ if (!received && _isNoWaitIsReceiving)
+ {
+ // Right a message nowait is waiting for a message
+ // but no one can be delivered it then need to return
+ _incomingMessageLock.notify();
+ }
+ }
}
}
}
else
{
- // This is an asynchronous message
- // tell the session that a message is in process
- getSession().preProcessMessage(message);
- // If the session is transacted we need to ack the message first
- // This is because a message is associated with its tx only when acked
- if (getSession().getTransacted())
- {
- getSession().acknowledgeMessage(message);
- }
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
- _messageListener.onMessage(message.getJMSMessage());
- }
- catch (RuntimeException re)
+ _messageAsyncrhonouslyReceived++;
+ if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
{
- // do nothing as this message will not be redelivered
+ // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
+ resetAsynchMessageReceived();
}
- // If the session has been recovered we then need to redelivered this message
- if (getSession().isInRecovery())
+ // only deliver the message if it is valid
+ if (messageOk)
{
- releaseMessage(message);
- }
- else if (!getSession().getTransacted())
- {
- // Tell the jms Session to ack this message if required
- getSession().acknowledgeMessage(message);
+ // This is an asynchronous message
+ // tell the session that a message is in process
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+ _messageListener.onMessage(message.getJMSMessage());
+ }
+ catch (RuntimeException re)
+ {
+ // do nothing as this message will not be redelivered
+ }
+ // If the session has been recovered we then need to redelivered this message
+ if (getSession().isInRecovery())
+ {
+ releaseMessage(message);
+ }
+ else if (!getSession().getTransacted())
+ {
+ // Tell the jms Session to ack this message if required
+ getSession().acknowledgeMessage(message);
+ }
}
}
}
@@ -415,27 +536,37 @@
* Release a message
*
* @param message The message to be released
- * @throws JMSException If the message cannot be released due to some internal error.
+ * @throws QpidException If the message cannot be released due to some internal error.
*/
- private void releaseMessage(QpidMessage message) throws JMSException
+ private void releaseMessage(QpidMessage message) throws QpidException
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
- try
+ if (_preAcquire)
{
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
getSession().getQpidSession().messageRelease(range);
}
- catch (QpidException e)
+ }
+
+ /**
+ * Acquire a message
+ *
+ * @param message The message to be acquired
+ * @return true if the message has been acquired, false otherwise.
+ * @throws QpidException If the message cannot be acquired due to some internal error.
+ */
+ private boolean acquireMessage(QpidMessage message) throws QpidException
+ {
+ boolean result = false;
+ if (!_preAcquire)
{
- // notify the Exception listener
- if (getSession().getConnection().getExceptionListener() != null)
- {
- getSession().getConnection().getExceptionListener()
- .onException(ExceptionHelper.convertQpidExceptionToJMSException(e));
- }
- if (_logger.isDebugEnabled())
+ Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+
+ Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range);
+ if (rangeResult.length > 0)
{
- _logger.debug("Excpetion when releasing message " + message, e);
+ result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0;
}
}
+ return result;
}
}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+
+/**
+ * An expression which performs an operation on two expression values
+ */
+public abstract class ArithmeticExpression extends BinaryExpression
+{
+
+ protected static final int INTEGER = 1;
+ protected static final int LONG = 2;
+ protected static final int DOUBLE = 3;
+
+
+ public ArithmeticExpression(Expression left, Expression right)
+ {
+ super(left, right);
+ }
+
+ public static Expression createPlus(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof String)
+ {
+ String text = (String) lvalue;
+ return text + rvalue;
+ }
+ else if (lvalue instanceof Number)
+ {
+ return plus((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "+";
+ }
+ };
+ }
+
+ public static Expression createMinus(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return minus((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "-";
+ }
+ };
+ }
+
+ public static Expression createMultiply(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return multiply((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "*";
+ }
+ };
+ }
+
+ public static Expression createDivide(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return divide((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "/";
+ }
+ };
+ }
+
+ public static Expression createMod(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return mod((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "%";
+ }
+ };
+ }
+
+ protected Number plus(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return new Integer(left.intValue() + right.intValue());
+
+ case LONG:
+ return new Long(left.longValue() + right.longValue());
+
+ default:
+ return new Double(left.doubleValue() + right.doubleValue());
+ }
+ }
+
+ protected Number minus(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return new Integer(left.intValue() - right.intValue());
+
+ case LONG:
+ return new Long(left.longValue() - right.longValue());
+
+ default:
+ return new Double(left.doubleValue() - right.doubleValue());
+ }
+ }
+
+ protected Number multiply(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return new Integer(left.intValue() * right.intValue());
+
+ case LONG:
+ return new Long(left.longValue() * right.longValue());
+
+ default:
+ return new Double(left.doubleValue() * right.doubleValue());
+ }
+ }
+
+ protected Number divide(Number left, Number right)
+ {
+ return new Double(left.doubleValue() / right.doubleValue());
+ }
+
+ protected Number mod(Number left, Number right)
+ {
+ return new Double(left.doubleValue() % right.doubleValue());
+ }
+
+ private int numberType(Number left, Number right)
+ {
+ if (isDouble(left) || isDouble(right))
+ {
+ return DOUBLE;
+ }
+ else if ((left instanceof Long) || (right instanceof Long))
+ {
+ return LONG;
+ }
+ else
+ {
+ return INTEGER;
+ }
+ }
+
+ private boolean isDouble(Number n)
+ {
+ return (n instanceof Float) || (n instanceof Double);
+ }
+
+ protected Number asNumber(Object value)
+ {
+ if (value instanceof Number)
+ {
+ return (Number) value;
+ }
+ else
+ {
+ throw new RuntimeException("Cannot convert value: " + value + " into a number");
+ }
+ }
+
+ public Object evaluate(Message message) throws QpidException
+ {
+ Object lvalue = left.evaluate(message);
+ if (lvalue == null)
+ {
+ return null;
+ }
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ return evaluate(lvalue, rvalue);
+ }
+
+ /**
+ * @param lvalue
+ * @param rvalue
+ * @return
+ */
+ protected abstract Object evaluate(Object lvalue, Object rvalue);
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ArithmeticExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * An expression which performs an operation on two expression values.
+ */
+public abstract class BinaryExpression implements Expression
+{
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right)
+ {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft()
+ {
+ return left;
+ }
+
+ public Expression getRight()
+ {
+ return right;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ public abstract String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression)
+ {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression)
+ {
+ left = expression;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BinaryExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ */
+public interface BooleanExpression extends Expression
+{
+
+ public boolean matches(Message message) throws QpidException;
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/BooleanExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,595 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * A filter performing a comparison of two objects
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
+{
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
+ {
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
+ {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ private static final HashSet REGEXP_CONTROL_CHARS = new HashSet();
+
+ static
+ {
+ REGEXP_CONTROL_CHARS.add(new Character('.'));
+ REGEXP_CONTROL_CHARS.add(new Character('\\'));
+ REGEXP_CONTROL_CHARS.add(new Character('['));
+ REGEXP_CONTROL_CHARS.add(new Character(']'));
+ REGEXP_CONTROL_CHARS.add(new Character('^'));
+ REGEXP_CONTROL_CHARS.add(new Character('$'));
+ REGEXP_CONTROL_CHARS.add(new Character('?'));
+ REGEXP_CONTROL_CHARS.add(new Character('*'));
+ REGEXP_CONTROL_CHARS.add(new Character('+'));
+ REGEXP_CONTROL_CHARS.add(new Character('{'));
+ REGEXP_CONTROL_CHARS.add(new Character('}'));
+ REGEXP_CONTROL_CHARS.add(new Character('|'));
+ REGEXP_CONTROL_CHARS.add(new Character('('));
+ REGEXP_CONTROL_CHARS.add(new Character(')'));
+ REGEXP_CONTROL_CHARS.add(new Character(':'));
+ REGEXP_CONTROL_CHARS.add(new Character('&'));
+ REGEXP_CONTROL_CHARS.add(new Character('<'));
+ REGEXP_CONTROL_CHARS.add(new Character('>'));
+ REGEXP_CONTROL_CHARS.add(new Character('='));
+ REGEXP_CONTROL_CHARS.add(new Character('!'));
+ }
+
+ static class LikeExpression extends UnaryExpression implements BooleanExpression
+ {
+
+ Pattern likePattern;
+
+ /**
+ * @param right
+ */
+ public LikeExpression(Expression right, String like, int escape)
+ {
+ super(right);
+
+ StringBuffer regexp = new StringBuffer(like.length() * 2);
+ regexp.append("\\A"); // The beginning of the input
+ for (int i = 0; i < like.length(); i++)
+ {
+ char c = like.charAt(i);
+ if (escape == (0xFFFF & c))
+ {
+ i++;
+ if (i >= like.length())
+ {
+ // nothing left to escape...
+ break;
+ }
+
+ char t = like.charAt(i);
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & t));
+ }
+ else if (c == '%')
+ {
+ regexp.append(".*?"); // Do a non-greedy match
+ }
+ else if (c == '_')
+ {
+ regexp.append("."); // match one
+ }
+ else if (REGEXP_CONTROL_CHARS.contains(new Character(c)))
+ {
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & c));
+ }
+ else
+ {
+ regexp.append(c);
+ }
+ }
+
+ regexp.append("\\z"); // The end of the input
+
+ likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL);
+ }
+
+ /**
+ * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol()
+ */
+ public String getExpressionSymbol()
+ {
+ return "LIKE";
+ }
+
+ /**
+ * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ */
+ public Object evaluate(Message message) throws QpidException
+ {
+
+ Object rv = this.getRight().evaluate(message);
+
+ if (rv == null)
+ {
+ return null;
+ }
+
+ if (!(rv instanceof String))
+ {
+ return
+ Boolean.FALSE;
+ // throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass());
+ }
+
+ return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public boolean matches(Message message) throws QpidException
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+ }
+
+ public static BooleanExpression createLike(Expression left, String right, String escape)
+ {
+ if ((escape != null) && (escape.length() != 1))
+ {
+ throw new RuntimeException(
+ "The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape);
+ }
+
+ int c = -1;
+ if (escape != null)
+ {
+ c = 0xFFFF & escape.charAt(0);
+ }
+
+ return new LikeExpression(left, right, c);
+ }
+
+ public static BooleanExpression createNotLike(Expression left, String right, String escape)
+ {
+ return UnaryExpression.createNOT(createLike(left, right, escape));
+ }
+
+ public static BooleanExpression createInFilter(Expression left, List elements)
+ {
+
+ if (!(left instanceof PropertyExpression))
+ {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, false);
+
+ }
+
+ public static BooleanExpression createNotInFilter(Expression left, List elements)
+ {
+
+ if (!(left instanceof PropertyExpression))
+ {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left)
+ {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left)
+ {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right)
+ {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right)
+ {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+
+ return doCreateEqual(left, right);
+ }
+
+ private static BooleanExpression doCreateEqual(Expression left, Expression right)
+ {
+ return new ComparisonExpression(left, right)
+ {
+
+ public Object evaluate(Message message) throws QpidException
+ {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if ((lv == null) ^ (rv == null))
+ {
+ return Boolean.FALSE;
+ }
+
+ if ((lv == rv) || lv.equals(rv))
+ {
+ return Boolean.TRUE;
+ }
+
+ if ((lv instanceof Comparable) && (rv instanceof Comparable))
+ {
+ return compare((Comparable) lv, (Comparable) rv);
+ }
+
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "=";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+ protected boolean asBoolean(int answer)
+ {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+ protected boolean asBoolean(int answer)
+ {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr)
+ {
+ if (expr instanceof ConstantExpression)
+ {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value instanceof Number)
+ {
+ return;
+ }
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+
+ if (expr instanceof BooleanExpression)
+ {
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression.
+ * Cannot not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr)
+ {
+ if (expr instanceof ConstantExpression)
+ {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value == null)
+ {
+ throw new RuntimeException("'" + expr + "' cannot be compared.");
+ }
+ }
+ }
+
+ /**
+ *
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right)
+ {
+ if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression))
+ {
+ if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression))
+ {
+ throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'");
+ }
+ }
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right)
+ {
+ super(left, right);
+ }
+
+ public Object evaluate(Message message) throws QpidException
+ {
+ Comparable lv = (Comparable) left.evaluate(message);
+ if (lv == null)
+ {
+ return null;
+ }
+
+ Comparable rv = (Comparable) right.evaluate(message);
+ if (rv == null)
+ {
+ return null;
+ }
+
+ return compare(lv, rv);
+ }
+
+ protected Boolean compare(Comparable lv, Comparable rv)
+ {
+ Class lc = lv.getClass();
+ Class rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc)
+ {
+ if (lc == Byte.class)
+ {
+ if (rc == Short.class)
+ {
+ lv = new Short(((Number) lv).shortValue());
+ }
+ else if (rc == Integer.class)
+ {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class)
+ {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class)
+ {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class)
+ {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Short.class)
+ {
+ if (rc == Integer.class)
+ {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class)
+ {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class)
+ {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class)
+ {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Integer.class)
+ {
+ if (rc == Long.class)
+ {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class)
+ {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class)
+ {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Long.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = new Long(((Number) rv).longValue());
+ }
+ else if (rc == Float.class)
+ {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class)
+ {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Float.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Long.class)
+ {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Double.class)
+ {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Double.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Long.class)
+ {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Float.class)
+ {
+ rv = new Float(((Number) rv).doubleValue());
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+
+ return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(Message message) throws QpidException
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ComparisonExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpidity.QpidException;
+
+import java.math.BigDecimal;
+
+
+import javax.jms.Message;
+
+/**
+ * Represents a constant expression
+ */
+public class ConstantExpression implements Expression
+{
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ {
+ public BooleanConstantExpression(Object value)
+ {
+ super(value);
+ }
+
+ public boolean matches(Message message) throws QpidException
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public static ConstantExpression createFromDecimal(String text)
+ {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L"))
+ {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ Number value;
+ try
+ {
+ value = new Long(text);
+ }
+ catch (NumberFormatException e)
+ {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = new Integer(value.intValue());
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromHex(String text)
+ {
+ Number value = new Long(Long.parseLong(text.substring(2), 16));
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = new Integer(value.intValue());
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromOctal(String text)
+ {
+ Number value = new Long(Long.parseLong(text, 8));
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = new Integer(value.intValue());
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text)
+ {
+ Number value = new Double(text);
+
+ return new ConstantExpression(value);
+ }
+
+ public ConstantExpression(Object value)
+ {
+ this.value = value;
+ }
+
+ public Object evaluate(Message message) throws QpidException
+ {
+ return value;
+ }
+
+ public Object getValue()
+ {
+ return value;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ if (value == null)
+ {
+ return "NULL";
+ }
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+ }
+
+ if (value instanceof String)
+ {
+ return encodeString((String) value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Encodes the value of string so that it looks like it would look like
+ * when it was provided in a selector.
+ *
+ * @param s
+ * @return
+ */
+ public static String encodeString(String s)
+ {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++)
+ {
+ char c = s.charAt(i);
+ if (c == '\'')
+ {
+ b.append(c);
+ }
+
+ b.append(c);
+ }
+
+ b.append('\'');
+
+ return b.toString();
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/ConstantExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+
+/**
+ * Represents an expression
+ */
+public interface Expression
+{
+ /**
+ * @param message The message to evaluate
+ * @return the value of this expression
+ */
+ public Object evaluate(Message message) throws QpidException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/Expression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.nclient.jms.filter;
+
+import org.apache.qpid.nclient.jms.filter.selector.SelectorParser;
+import org.apache.qpidity.QpidException;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import javax.jms.Message;
+
+
+public class JMSSelectorFilter implements MessageFilter
+{
+ /**
+ * this JMSSelectorFilter's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class);
+
+ private String _selector;
+ private BooleanExpression _matcher;
+
+ public JMSSelectorFilter(String selector) throws QpidException
+ {
+ _selector = selector;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ }
+ _matcher = new SelectorParser().parse(selector);
+ }
+
+ public boolean matches(Message message)
+ {
+ try
+ {
+ boolean match = _matcher.matches(message);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(message + " match(" + match + ") selector(" + System
+ .identityHashCode(_selector) + "):" + _selector);
+ }
+ return match;
+ }
+ catch (QpidException e)
+ {
+ _logger.warn("Caght exception when evaluating message selector for message " + message, e);
+ }
+ return false;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/JMSSelectorFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+
+/**
+ * A filter performing a comparison of two objects
+ */
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
+{
+
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
+ {
+ return new LogicExpression(lvalue, rvalue)
+ {
+
+ public Object evaluate(Message message) throws QpidException
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if ((lv != null) && lv.booleanValue())
+ {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "OR";
+ }
+ };
+ }
+
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
+ {
+ return new LogicExpression(lvalue, rvalue)
+ {
+
+ public Object evaluate(Message message) throws QpidException
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ {
+ return null;
+ }
+
+ if (!lv.booleanValue())
+ {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "AND";
+ }
+ };
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public LogicExpression(BooleanExpression left, BooleanExpression right)
+ {
+ super(left, right);
+ }
+
+ public abstract Object evaluate(Message message) throws QpidException;
+
+ public boolean matches(Message message) throws QpidException
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/LogicExpression.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.nclient.jms.filter;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+
+public interface MessageFilter
+{
+ boolean matches(Message message) throws QpidException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/MessageFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java?view=auto&rev=562885
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java Sun Aug 5 08:12:50 2007
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms.filter;
+
+import org.apache.qpidity.QpidException;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Message;
+import java.lang.reflect.Method;
+
+/**
+ * Represents a property expression
+ */
+public class PropertyExpression implements Expression
+{
+ private static final org.slf4j.Logger _logger = LoggerFactory.getLogger(PropertyExpression.class);
+
+ private Method _getter;
+
+ public PropertyExpression(String name)
+ {
+ Class clazz = Message.class;
+ try
+ {
+ _getter = clazz.getMethod("get" + name, null);
+ }
+ catch (NoSuchMethodException e)
+ {
+ _logger.warn("Cannot compare property: " + name, e);
+ }
+ }
+
+ public Object evaluate(Message message) throws QpidException
+ {
+ Object result = null;
+ if( _getter != null )
+ {
+ try
+ {
+ result = _getter.invoke(message, null);
+ }
+ catch (Exception e)
+ {
+ throw new QpidException("cannot evaluate property ", "message selector", e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see Object#toString()
+ */
+ public String toString()
+ {
+ return _getter.toString();
+ }
+
+ /**
+ * @see Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return _getter.hashCode();
+ }
+
+ /**
+ * @see Object#equals(Object)
+ */
+ public boolean equals(Object o)
+ {
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+ return _getter.equals(((PropertyExpression) o)._getter);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/filter/PropertyExpression.java
------------------------------------------------------------------------------
svn:eol-style = native