You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 20:33:40 UTC
svn commit: r577011 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession.java AMQSession_0_10.java AMQSession_0_8.java
BasicMessageConsumer_0_10.java
Author: arnaudsimon
Date: Tue Sep 18 11:33:39 2007
New Revision: 577011
URL: http://svn.apache.org/viewvc?rev=577011&view=rev
Log:
added message selector evaluation (for 0_10 only)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Sep 18 11:33:39 2007
@@ -1552,7 +1552,7 @@
public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose);
+ final boolean noConsume, final boolean autoClose) throws JMSException;
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Sep 18 11:33:39 2007
@@ -79,6 +79,7 @@
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
+ * @param qpidConnection The qpid connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
@@ -107,6 +108,7 @@
* @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
+ * @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
@@ -276,7 +278,7 @@
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
- final boolean autoClose)
+ final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
@@ -304,8 +306,18 @@
boolean nowait, String messageSelector, AMQShortString tag)
throws AMQException, FailoverException
{
+ boolean preAcquire;
+ try
+ {
+ preAcquire = consumer.getMessageSelector() == null || !(consumer.getDestination() instanceof AMQQueue);
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+ }
getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE :
+ Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Sep 18 11:33:39 2007
@@ -327,7 +327,7 @@
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
- final boolean noConsume, final boolean autoClose)
+ final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 11:33:39 2007
@@ -27,10 +27,15 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.ExchangeQueryResult;
import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
import javax.jms.JMSException;
import java.io.IOException;
@@ -47,51 +52,105 @@
*/
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ /**
+ * The message selector filter associated with this consumer message selector
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * The underlying QpidSession
+ */
+ private AMQSession_0_10 _0_10session;
+
+ /**
+ * Indicates whether this consumer receives pre-acquired messages
+ */
+ private boolean _preAcquire = true;
+
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
-
+ _0_10session = (AMQSession_0_10) session;
+ if (messageSelector != null)
+ {
+ try
+ {
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ catch (QpidException e)
+ {
+ throw new JMSException("cannot create consumer because of selector issue");
+ }
+ if (destination instanceof AMQQueue)
+ {
+ _preAcquire = false;
+ }
+ }
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
public void onMessage(Message message)
{
- int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
- AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- boolean redelivered = message.getDeliveryProperties().getRedelivered();
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ boolean messageOk = false;
try
{
- newMessage.receiveBody(message.readData());
+ messageOk = checkPreConditions(message);
}
- catch (IOException e)
+ catch (AMQException e)
{
- getSession().getAMQConnection().exceptionReceived(e);
+ try
+ {
+ getSession().getAMQConnection().getExceptionListener()
+ .onException(new JMSAMQException("Error when receiving message", e));
+ }
+ catch (JMSException e1)
+ {
+ // we should silently log thie exception as it only hanppens when the connection is closed
+ _logger.error("Exception when receiving message", e1);
+ }
}
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
- if (! message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
- {
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
- .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
- .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
- .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
+ if (messageOk)
+ {
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered = message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
+ }
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
}
- newMessage.setContentHeader(headers);
- getSession().messageReceived(newMessage);
+ // else ignore this message
}
//----- overwritten methods
@@ -130,6 +189,128 @@
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
messageFrame.getExchange(), messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
+ messageFrame.getContentHeader(), messageFrame.getBodies(),
+ messageFrame.getReplyToURL());
+ }
+
+ // private methods
+ /**
+ * Check whether a message can be delivered to this consumer.
+ *
+ * @param message The message to be checked.
+ * @return true if the message matches the selector and can be acquired, false otherwise.
+ * @throws AMQException If the message preConditions cannot be checked due to some internal error.
+ */
+ private boolean checkPreConditions(Message message) throws AMQException
+ {
+ boolean messageOk = true;
+ // TODO Use a tag for fiding out if message filtering is done here or by the broker.
+ try
+ {
+ if (getMessageSelector() != null)
+ {
+ messageOk = _filter.matches((javax.jms.Message) message);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("messageOk " + messageOk);
+ _logger.debug("_preAcquire " + _preAcquire);
+ }
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
+ }
+ else if (!messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to acquire message");
+ }
+ messageOk = acquireMessage(message);
+ }
+ return messageOk;
+ }
+
+ /**
+ * Acknowledge a message
+ *
+ * @param message The message to be acknowledged
+ * @throws AMQException If the message cannot be acquired due to some internal error.
+ */
+ private void acknowledgeMessage(Message message) throws AMQException
+ {
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageAcknowledge(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Release a message
+ *
+ * @param message The message to be released
+ * @throws AMQException If the message cannot be released due to some internal error.
+ */
+ private void releaseMessage(Message message) throws AMQException
+ {
+ if (_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageRelease(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Acquire a message
+ *
+ * @param message The message to be acquired
+ * @return true if the message has been acquired, false otherwise.
+ * @throws AMQException If the message cannot be acquired due to some internal error.
+ */
+ private boolean acquireMessage(Message message) throws AMQException
+ {
+ boolean result = false;
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+
+ _0_10session.getQpidSession()
+ .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ _0_10session.getQpidSession().sync();
+ RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+ if (acquired.size() > 0)
+ {
+ result = true;
+ }
+ _0_10session.getCurrentException();
+ }
+ return result;
}
}