You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/10/18 13:20:53 UTC
svn commit: r1185580 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/test/unit/topic/
Author: kwall
Date: Tue Oct 18 11:20:53 2011
New Revision: 1185580
URL: http://svn.apache.org/viewvc?rev=1185580&view=rev
Log:
QPID-3542: ensure session complete sent for filtered out messages
Applied patch from Andrew MacBean <an...@gmail.com> and myself.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Oct 18 11:20:53 2011
@@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Oct 18 11:20:53 2011
@@ -269,8 +269,9 @@ public class BasicMessageConsumer_0_10 e
{
if (_logger.isDebugEnabled())
{
- _logger.debug("filterMessage - not ack'ing messaage as not aquired");
+ _logger.debug("filterMessage - not ack'ing message as not acquired");
}
+ flushUnwantedMessage(message);
}
}
@@ -312,6 +313,26 @@ public class BasicMessageConsumer_0_10 e
}
/**
+ * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+ * processed to ensure their AMQP command-id is marked completed.
+ *
+ * @param message The unwanted message to be flushed
+ * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+ */
+ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+ {
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.flushProcessed(ranges,false);
+
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ /**
* Acquire a message
*
* @param message The message to be acquired
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1185580&r1=1185579&r2=1185580&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Tue Oct 18 11:20:53 2011
@@ -33,6 +33,7 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -320,6 +321,7 @@ public class TopicSessionTest extends Qp
final Connection con1 = getConnection();
final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic1 = session1.createTopic(topicName);
+ final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
// Setup subscriber with selector
final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
@@ -339,13 +341,9 @@ public class TopicSessionTest extends Qp
session1.close();
- // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
- // is not received. This defect meant that such a message would be received.
+ // Now verify queue depth on broker.
final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic topic2 = session2.createTopic(topicName);
-
- final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
- final Message message2 = sameSubscriberWithoutSelector.receive(1000);
- assertNull("still should not have received message", message2);
+ final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+ assertEquals("Expected queue depth of zero", 0, depth);
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org