You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/18 20:33:40 UTC

svn commit: r577011 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession.java AMQSession_0_10.java AMQSession_0_8.java BasicMessageConsumer_0_10.java

Author: arnaudsimon
Date: Tue Sep 18 11:33:39 2007
New Revision: 577011

URL: http://svn.apache.org/viewvc?rev=577011&view=rev
Log:
added message selector evaluation (for 0_10 only) 

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Sep 18 11:33:39 2007
@@ -1552,7 +1552,7 @@
 
     public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
             final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
-            final boolean noConsume, final boolean autoClose);
+            final boolean noConsume, final boolean autoClose) throws JMSException;
 
     /**
      * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Sep 18 11:33:39 2007
@@ -79,6 +79,7 @@
      * @param messageFactoryRegistry  The message factory factory for the session.
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
+     * @param qpidConnection          The qpid connection
      */
     AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
@@ -107,6 +108,7 @@
      * @param acknowledgeMode     The acknoledgement mode for the session.
      * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     * @param qpidConnection      The connection
      */
     AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection, AMQConnection con, int channelId,
                     boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
@@ -276,7 +278,7 @@
                                                       final int prefetchLow, final boolean noLocal,
                                                       final boolean exclusive, String messageSelector,
                                                       final FieldTable ft, final boolean noConsume,
-                                                      final boolean autoClose)
+                                                      final boolean autoClose) throws JMSException
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
@@ -304,8 +306,18 @@
                             boolean nowait, String messageSelector, AMQShortString tag)
             throws AMQException, FailoverException
     {
+        boolean preAcquire;
+        try
+        {
+            preAcquire = consumer.getMessageSelector() == null || !(consumer.getDestination() instanceof AMQQueue);
+        }
+        catch (JMSException e)
+        {
+           throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+        }
         getQpidSession().messageSubscribe(queueName.toString(), tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
-                                          Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                          preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE :
+                                                  Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
                                           new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
                                           consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
                                           consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Sep 18 11:33:39 2007
@@ -327,7 +327,7 @@
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
             final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
-            final boolean noConsume, final boolean autoClose)
+            final boolean noConsume, final boolean autoClose)  throws JMSException
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 18 11:33:39 2007
@@ -27,10 +27,15 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.transport.Struct;
 import org.apache.qpidity.transport.ExchangeQueryResult;
 import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
 
 import javax.jms.JMSException;
 import java.io.IOException;
@@ -47,51 +52,105 @@
      */
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
+    /**
+     * The message selector filter associated with this consumer message selector
+     */
+    private MessageFilter _filter = null;
+
+    /**
+     * The underlying QpidSession
+     */
+    private AMQSession_0_10 _0_10session;
+
+    /**
+     * Indicates whether this consumer receives pre-acquired messages
+     */
+    private boolean _preAcquire = true;
+
+    //--- constructor 
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
                                         FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
                                         boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+            throws JMSException
     {
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
               rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
-
+        _0_10session = (AMQSession_0_10) session;
+        if (messageSelector != null)
+        {
+            try
+            {
+                _filter = new JMSSelectorFilter(messageSelector);
+            }
+            catch (QpidException e)
+            {
+                throw new JMSException("cannot create consumer because of selector issue");
+            }
+            if (destination instanceof AMQQueue)
+            {
+                _preAcquire = false;
+            }
+        }
     }
 
     // ----- Interface org.apache.qpidity.client.util.MessageListener
     public void onMessage(Message message)
     {
-        int channelId = getSession().getChannelId();
-        long deliveryId = message.getMessageTransferId();
-        String consumerTag = getConsumerTag().toString();
-        AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
-        AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
-        boolean redelivered = message.getDeliveryProperties().getRedelivered();
-        UnprocessedMessage_0_10 newMessage =
-                new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+        boolean messageOk = false;
         try
         {
-            newMessage.receiveBody(message.readData());
+            messageOk = checkPreConditions(message);
         }
-        catch (IOException e)
+        catch (AMQException e)
         {
-            getSession().getAMQConnection().exceptionReceived(e);
+            try
+            {
+                getSession().getAMQConnection().getExceptionListener()
+                        .onException(new JMSAMQException("Error when receiving message", e));
+            }
+            catch (JMSException e1)
+            {
+               // we should silently log thie exception as it only hanppens when the connection is closed
+               _logger.error("Exception when receiving message", e1);
+            }
         }
-        Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-        // if there is a replyto destination then we need to request the exchange info
-        if (! message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
-        {
-            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
-                    .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
-            ExchangeQueryResult res = future.get();
-            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-            String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
-                    .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
-                    .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
-            newMessage.setReplyToURL(replyToUrl);
+        if (messageOk)
+        {
+            int channelId = getSession().getChannelId();
+            long deliveryId = message.getMessageTransferId();
+            String consumerTag = getConsumerTag().toString();
+            AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+            AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+            boolean redelivered = message.getDeliveryProperties().getRedelivered();
+            UnprocessedMessage_0_10 newMessage =
+                    new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+            try
+            {
+                newMessage.receiveBody(message.readData());
+            }
+            catch (IOException e)
+            {
+                getSession().getAMQConnection().exceptionReceived(e);
+            }
+            Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+            // if there is a replyto destination then we need to request the exchange info
+            if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+            {
+                Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+                        .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+                ExchangeQueryResult res = future.get();
+                // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+                String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+                        .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+                        .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+                newMessage.setReplyToURL(replyToUrl);
+            }
+            newMessage.setContentHeader(headers);
+            getSession().messageReceived(newMessage);
         }
-        newMessage.setContentHeader(headers);
-        getSession().messageReceived(newMessage);
+        // else ignore this message
     }
 
     //----- overwritten methods
@@ -130,6 +189,128 @@
     {
         return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
                                              messageFrame.getExchange(), messageFrame.getRoutingKey(),
-                                             messageFrame.getContentHeader(), messageFrame.getBodies(), messageFrame.getReplyToURL());
+                                             messageFrame.getContentHeader(), messageFrame.getBodies(),
+                                             messageFrame.getReplyToURL());
+    }
+
+    // private methods
+    /**
+     * Check whether a message can be delivered to this consumer.
+     *
+     * @param message The message to be checked.
+     * @return true if the message matches the selector and can be acquired, false otherwise.
+     * @throws AMQException If the message preConditions cannot be checked due to some internal error.
+     */
+    private boolean checkPreConditions(Message message) throws AMQException
+    {
+        boolean messageOk = true;
+        // TODO Use a tag for fiding out if message filtering is done here or by the broker. 
+        try
+        {
+            if (getMessageSelector() != null)
+            {
+                messageOk = _filter.matches((javax.jms.Message) message);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
+        }
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("messageOk " + messageOk);
+            _logger.debug("_preAcquire " + _preAcquire);
+        }
+        if (!messageOk && _preAcquire)
+        {
+            // this is the case for topics
+            // We need to ack this message
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to ack message");
+            }
+            acknowledgeMessage(message);
+        }
+        else if (!messageOk)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Message not OK, releasing");
+            }
+            releaseMessage(message);
+        }
+        // now we need to acquire this message if needed
+        // this is the case of queue with a message selector set
+        if (!_preAcquire && messageOk)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to acquire message");
+            }
+            messageOk = acquireMessage(message);
+        }
+        return messageOk;
+    }
+
+    /**
+     * Acknowledge a message
+     *
+     * @param message The message to be acknowledged
+     * @throws AMQException If the message cannot be acquired due to some internal error.
+     */
+    private void acknowledgeMessage(Message message) throws AMQException
+    {
+        if (!_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+            _0_10session.getQpidSession().messageAcknowledge(ranges);
+            _0_10session.getCurrentException();
+        }
+    }
+
+    /**
+     * Release a message
+     *
+     * @param message The message to be released
+     * @throws AMQException If the message cannot be released due to some internal error.
+     */
+    private void releaseMessage(Message message) throws AMQException
+    {
+        if (_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+            _0_10session.getQpidSession().messageRelease(ranges);
+            _0_10session.getCurrentException();
+        }
+    }
+
+    /**
+     * Acquire a message
+     *
+     * @param message The message to be acquired
+     * @return true if the message has been acquired, false otherwise.
+     * @throws AMQException If the message cannot be acquired due to some internal error.
+     */
+    private boolean acquireMessage(Message message) throws AMQException
+    {
+        boolean result = false;
+        if (!_preAcquire)
+        {
+            RangeSet ranges = new RangeSet();
+            ranges.add(message.getMessageTransferId());
+
+            _0_10session.getQpidSession()
+                    .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+            _0_10session.getQpidSession().sync();
+            RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+            if (acquired.size() > 0)
+            {
+                result = true;
+            }
+            _0_10session.getCurrentException();
+        }
+        return result;
     }
 }