You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/10/11 18:00:17 UTC

svn commit: r1181861 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/filter/ systests/src/main/java/org/apache/qpid/test/unit/topic/ test-profiles/

Author: kwall
Date: Tue Oct 11 16:00:17 2011
New Revision: 1181861

URL: http://svn.apache.org/viewvc?rev=1181861&view=rev
Log:
QPID-3542: Java client does not ack non-matching messages when using client side selectors (CPP Broker)

Applied patch from Andrew MacBean <an...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1181861&r1=1181860&r2=1181861&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Oct 11 16:00:17 2011
@@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (isMessageListenerSet() && capacity == 0)
                 {
-                    _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                              MessageCreditUnit.MESSAGE, 1,
-                                                              Option.UNRELIABLE);
+                    messageFlow();
                 }
                 _logger.debug("messageOk, trying to notify");
                 super.notifyMessage(jmsMessage);
             }
+            else
+            {
+                // if we are synchronously waiting for a message
+                // and messages are not pre-fetched we then need to request another one
+                if(capacity == 0)
+                {
+                   messageFlow();
+                }
+            }
         }
         catch (AMQException e)
         {
@@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 e
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
         }
+
         if (!messageOk)
         {
             if (_preAcquire)
@@ -261,19 +269,11 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Message not OK, releasing");
+                    _logger.debug("filterMessage - not ack'ing messaage as not aquired");
                 }
-                releaseMessage(message);
-            }
-            // if we are syncrhonously waiting for a message
-            // and messages are not prefetched we then need to request another one
-            if(capacity == 0)
-            {
-               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                         MessageCreditUnit.MESSAGE, 1,
-                                                         Option.UNRELIABLE);
             }
         }
+
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
         if (!_preAcquire && messageOk && !isNoConsume())
@@ -285,6 +285,7 @@ public class BasicMessageConsumer_0_10 e
             messageOk = acquireMessage(message);
             _logger.debug("filterMessage - message acquire status : " + messageOk);
         }
+
         return messageOk;
     }
 
@@ -295,38 +296,18 @@ public class BasicMessageConsumer_0_10 e
      * @param message The message to be acknowledged
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+    private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
     {
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.messageAcknowledge
-                (ranges,
-                 _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-
-            AMQException amqe = _0_10session.getCurrentException();
-            if (amqe != null)
-            {
-                throw amqe;
-            }
-        }
-    }
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.messageAcknowledge
+            (ranges,
+             _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
 
-    /**
-     * Release a message
-     *
-     * @param message The message to be released
-     * @throws AMQException If the message cannot be released due to some internal error.
-     */
-    private void releaseMessage(AbstractJMSMessage message) throws AMQException
-    {
-        if (_preAcquire)
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
         {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.getQpidSession().messageRelease(ranges);
-            _0_10session.sync();
+            throw amqe;
         }
     }
 
@@ -337,25 +318,28 @@ public class BasicMessageConsumer_0_10 e
      * @return true if the message has been acquired, false otherwise.
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+    private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
 
-            Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+        final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
 
-            RangeSet acquired = acq.getTransfers();
-            if (acquired != null && acquired.size() > 0)
-            {
-                result = true;
-            }
+        final RangeSet acquired = acq.getTransfers();
+        if (acquired != null && acquired.size() > 0)
+        {
+            result = true;
         }
         return result;
     }
 
+    private void messageFlow()
+    {
+        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                  MessageCreditUnit.MESSAGE, 1,
+                                                  Option.UNRELIABLE);
+    }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
@@ -364,9 +348,7 @@ public class BasicMessageConsumer_0_10 e
         {
             if (messageListener != null && capacity == 0)
             {
-                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                          MessageCreditUnit.MESSAGE, 1,
-                                                          Option.UNRELIABLE);
+                messageFlow();
             }
             if (messageListener != null && !_synchronousQueue.isEmpty())
             {
@@ -389,9 +371,7 @@ public class BasicMessageConsumer_0_10 e
     {
         if (_0_10session.isStarted() && _syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow
-                (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
-                 Option.UNRELIABLE);
+            messageFlow();
         }
     }
 
@@ -412,9 +392,7 @@ public class BasicMessageConsumer_0_10 e
         }
         if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1,
-                                                      Option.UNRELIABLE);
+            messageFlow();
         }
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1181861&r1=1181860&r2=1181861&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Tue Oct 11 16:00:17 2011
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implement
     public JMSSelectorFilter(String selector) throws AMQInternalException
     {
         _selector = selector;
-        if (JMSSelectorFilter._logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+            _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
         }
         _matcher = new SelectorParser().parse(selector);
     }
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implement
         try
         {
             boolean match = _matcher.matches(message);
-            if (JMSSelectorFilter._logger.isDebugEnabled())
+            if (_logger.isDebugEnabled())
             {
-                JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
+                _logger.debug(message + " match(" + match + ") selector(" + System
                         .identityHashCode(_selector) + "):" + _selector);
             }
             return match;
         }
         catch (AMQInternalException e)
         {
-            JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message  " + message, e);
+            _logger.warn("Caught exception when evaluating message selector for message  " + message, e);
         }
         return false;
     }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1181861&r1=1181860&r2=1181861&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Oct 11 16:00:17 2011
@@ -20,9 +20,14 @@
  */
 package org.apache.qpid.test.unit.topic;
 
+import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
@@ -30,7 +35,6 @@ import javax.jms.TopicSubscriber;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 
@@ -306,51 +310,42 @@ public class TopicSessionTest extends Qp
     }
 
     /**
-     * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
-     * due to a selector can be leaked.
-     * @throws Exception
+     * This tests was added to demonstrate QPID-3542.  The Java Client when used with the CPP Broker was failing to
+     * ack messages received that did not match the selector.  This meant the messages remained indefinitely on the Broker.
      */
-    public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+    public void testNonMatchingMessagesHandledCorrectly() throws Exception
     {
-        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
-        // Setup Topic
-        AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
-        TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+        final String topicName = getName();
+        final String clientId = "clientId" + topicName;
+        final Connection con1 = getConnection();
+        final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic1 = session1.createTopic(topicName);
 
         // Setup subscriber with selector
-        TopicSubscriber selector = session.createSubscriber(topic,  "Selector = 'select'", false);
-        TopicPublisher publisher = session.createPublisher(topic);
-
-        con.start();
-        TextMessage m;
-        TextMessage message;
-
-        // Send non-matching message
-        message = session.createTextMessage("non-matching 1");
-        publisher.publish(message);
-        session.commit();
+        final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+        final MessageProducer publisher = session1.createProducer(topic1);
 
-        // Send and consume matching message
-        message = session.createTextMessage("hello");
-        message.setStringProperty("Selector", "select");
-
-        publisher.publish(message);
-        session.commit();
-
-        m = (TextMessage) selector.receive(1000);
-        assertNotNull("should have received message", m);
-        assertEquals("Message contents were wrong", "hello", m.getText());
+        con1.start();
 
         // Send non-matching message
-        message = session.createTextMessage("non-matching 2");
-        publisher.publish(message);
-        session.commit();
-
-        // Assert queue count is 0
-        long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
-        assertEquals("Queue depth was wrong", 0, depth);
-
+        final Message sentMessage = session1.createTextMessage("hello");
+        sentMessage.setStringProperty("Selector", "nonMatch");
+        publisher.send(sentMessage);
+
+        // Try to consume non-message, expect this to fail.
+        final Message message1 = subscriberWithSelector.receive(1000);
+        assertNull("should not have received message", message1);
+        subscriberWithSelector.close();
+
+        session1.close();
+
+        // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
+        // is not received.  This defect meant that such a message would be received.
+        final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic2 = session2.createTopic(topicName);
+
+        final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
+        final Message message2 = sameSubscriberWithoutSelector.receive(1000);
+        assertNull("still should not have received message", message2);
     }
 }

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1181861&r1=1181860&r2=1181861&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Tue Oct 11 16:00:17 2011
@@ -64,9 +64,6 @@ org.apache.qpid.test.unit.client.connect
 // 0-10 c++ broker in cpp.testprofile is started with no auth so won't pass this test
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testPasswordFailureConnection
 
-// c++ broker doesn't do selectors, so this will fail
-org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesDoNotFillQueue
-
 // InVM Broker tests
 org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org