You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 17:03:29 UTC

svn commit: r577326 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Author: arnaudsimon
Date: Wed Sep 19 08:03:28 2007
New Revision: 577326

URL: http://svn.apache.org/viewvc?rev=577326&view=rev
Log:
updated for using jms message for filtering incoming messages

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=577326&r1=577325&r2=577326&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Sep 19 08:03:28 2007
@@ -96,12 +96,17 @@
     }
 
     // ----- Interface org.apache.qpidity.client.util.MessageListener
-    public void onMessage(Message message)
+
+    /**
+     * @param jmsMessage this message has already been processed so can't redo preDeliver
+     * @param channelId
+     */
+    public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
     {
         boolean messageOk = false;
         try
         {
-            messageOk = checkPreConditions(message);
+            messageOk = checkPreConditions(jmsMessage);
         }
         catch (AMQException e)
         {
@@ -112,44 +117,51 @@
             }
             catch (JMSException e1)
             {
-               // we should silently log thie exception as it only hanppens when the connection is closed
-               _logger.error("Exception when receiving message", e1);
+                // we should silently log thie exception as it only hanppens when the connection is closed
+                _logger.error("Exception when receiving message", e1);
             }
         }
         if (messageOk)
         {
-            int channelId = getSession().getChannelId();
-            long deliveryId = message.getMessageTransferId();
-            String consumerTag = getConsumerTag().toString();
-            AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
-            AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
-            boolean redelivered = message.getDeliveryProperties().getRedelivered();
-            UnprocessedMessage_0_10 newMessage =
-                    new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
-            try
-            {
-                newMessage.receiveBody(message.readData());
-            }
-            catch (IOException e)
-            {
-                getSession().getAMQConnection().exceptionReceived(e);
-            }
-            Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-            // if there is a replyto destination then we need to request the exchange info
-            if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
-            {
-                Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
-                        .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
-                ExchangeQueryResult res = future.get();
-                // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-                String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
-                        .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
-                        .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
-                newMessage.setReplyToURL(replyToUrl);
-            }
-            newMessage.setContentHeader(headers);
-            getSession().messageReceived(newMessage);
+            super.notifyMessage(jmsMessage, channelId);
+        }
+    }
+
+
+    public void onMessage(Message message)
+    {
+        boolean messageOk = false;
+        int channelId = getSession().getChannelId();
+        long deliveryId = message.getMessageTransferId();
+        String consumerTag = getConsumerTag().toString();
+        AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+        AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+        boolean redelivered = message.getDeliveryProperties().getRedelivered();
+        UnprocessedMessage_0_10 newMessage =
+                new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+        try
+        {
+            newMessage.receiveBody(message.readData());
+        }
+        catch (IOException e)
+        {
+            getSession().getAMQConnection().exceptionReceived(e);
+        }
+        Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+        // if there is a replyto destination then we need to request the exchange info
+        if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+        {
+            Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+                    .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+            ExchangeQueryResult res = future.get();
+            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+            String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+                    .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+                    .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+            newMessage.setReplyToURL(replyToUrl);
         }
+        newMessage.setContentHeader(headers);
+        getSession().messageReceived(newMessage);
         // else ignore this message
     }
 
@@ -201,7 +213,7 @@
      * @return true if the message matches the selector and can be acquired, false otherwise.
      * @throws AMQException If the message preConditions cannot be checked due to some internal error.
      */
-    private boolean checkPreConditions(Message message) throws AMQException
+    private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
     {
         boolean messageOk = true;
         // TODO Use a tag for fiding out if message filtering is done here or by the broker. 
@@ -258,12 +270,12 @@
      * @param message The message to be acknowledged
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private void acknowledgeMessage(Message message) throws AMQException
+    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
     {
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getMessageTransferId());
+            ranges.add(message.getDeliveryTag());
             _0_10session.getQpidSession().messageAcknowledge(ranges);
             _0_10session.getCurrentException();
         }
@@ -275,12 +287,12 @@
      * @param message The message to be released
      * @throws AMQException If the message cannot be released due to some internal error.
      */
-    private void releaseMessage(Message message) throws AMQException
+    private void releaseMessage(AbstractJMSMessage message) throws AMQException
     {
         if (_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getMessageTransferId());
+            ranges.add(message.getDeliveryTag());
             _0_10session.getQpidSession().messageRelease(ranges);
             _0_10session.getCurrentException();
         }
@@ -293,13 +305,13 @@
      * @return true if the message has been acquired, false otherwise.
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private boolean acquireMessage(Message message) throws AMQException
+    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            ranges.add(message.getMessageTransferId());
+            ranges.add(message.getDeliveryTag());
 
             _0_10session.getQpidSession()
                     .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);