You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/10/18 13:20:53 UTC

svn commit: r1185580 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/topic/

Author: kwall
Date: Tue Oct 18 11:20:53 2011
New Revision: 1185580

URL: http://svn.apache.org/viewvc?rev=1185580&view=rev
Log:
QPID-3542: ensure session complete sent for filtered out messages

Applied patch from Andrew MacBean <an...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Oct 18 11:20:53 2011
@@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    void messageAcknowledge(RangeSet ranges, boolean accept)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept)
     {
         messageAcknowledge(ranges,accept,false);
     }
     
-    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
     {
-        Session ssn = getQpidSession();
-        for (Range range : ranges)
+        final Session ssn = getQpidSession();
+        flushProcessed(ranges,accept);
+        if (accept)
         {
-            ssn.processed(range);
+            ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
         }
-        ssn.flushProcessed(accept ? BATCH : NONE);
-        if (accept)
+    }
+
+    /**
+     * Flush any outstanding commands. This causes session complete to be sent.
+     * @param ranges the range of command ids.
+     * @param batch true if batched.
+     */
+    void flushProcessed(final RangeSet ranges, final boolean batch)
+    {
+        final Session ssn = getQpidSession();
+        for (final Range range : ranges)
         {
-            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+            ssn.processed(range);
         }
+        ssn.flushProcessed(batch ? BATCH : NONE);
     }
 
     /**

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Oct 18 11:20:53 2011
@@ -269,8 +269,9 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("filterMessage - not ack'ing messaage as not aquired");
+                    _logger.debug("filterMessage - not ack'ing message as not acquired");
                 }
+                flushUnwantedMessage(message);
             }
         }
 
@@ -312,6 +313,26 @@ public class BasicMessageConsumer_0_10 e
     }
 
     /**
+     * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+     * processed to ensure their AMQP command-id is marked completed.
+     *
+     * @param message The unwanted message to be flushed
+     * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+     */
+    private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+    {
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.flushProcessed(ranges,false);
+
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
+        }
+    }
+
+    /**
      * Acquire a message
      *
      * @param message The message to be acquired

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Oct 18 11:20:53 2011
@@ -33,6 +33,7 @@ import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -320,6 +321,7 @@ public class TopicSessionTest extends Qp
         final Connection con1 = getConnection();
         final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         final Topic topic1 = session1.createTopic(topicName);
+        final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
 
         // Setup subscriber with selector
         final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
@@ -339,13 +341,9 @@ public class TopicSessionTest extends Qp
 
         session1.close();
 
-        // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
-        // is not received.  This defect meant that such a message would be received.
+        // Now verify queue depth on broker.
         final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        final Topic topic2 = session2.createTopic(topicName);
-
-        final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
-        final Message message2 = sameSubscriberWithoutSelector.receive(1000);
-        assertNull("still should not have received message", message2);
+        final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+        assertEquals("Expected queue depth of zero", 0, depth);
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org