You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2013/12/06 22:29:37 UTC
[jira] [Created] (QPID-5405) [JMS Client] QPid AMQP JMS client does
not appear to check for expired messages that have been prefetched.
Timothy Bish created QPID-5405:
----------------------------------
Summary: [JMS Client] QPid AMQP JMS client does not appear to check for expired messages that have been prefetched.
Key: QPID-5405
URL: https://issues.apache.org/jira/browse/QPID-5405
Project: Qpid
Issue Type: Bug
Components: Java Client
Affects Versions: 0.24, 0.26
Reporter: Timothy Bish
The JMS client does not seem to check for message expiration for messages that have been received and stored in the prefetch buffer and later processed by a synchronous consumer, or a slow async consumer. The Broker used for testing was an ActiveMQ v5.10-SNAPSHOT.
The following test shows the basic idea.
{code}
@Test(timeout=30000)
public void testTTL() throws Exception {
Connection connection = null;
try {
QueueImpl queue = new QueueImpl("queue://" + name);
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(1000);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
producer.setTimeToLive(100);
producer.send(toSend);
TimeUnit.SECONDS.sleep(2);
received = consumer.receive(5000);
if (received != null) {
LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
received.getJMSExpiration() - received.getJMSTimestamp()});
}
assertNull(received);
} finally {
connection.close();
}
}
{code}
Adding in some debug in the MessageConsumerImpl show the dispatch of expired messages.
{code}
private MessageImpl receiveImpl(long timeout) throws JMSException
{
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
else
{
msg = _replaymessages.remove(0);
redelivery = true;
}
if(msg != null)
{
if (isExpired(msg)) {
System.out.println("Received expired message:");
}
preReceiveAction(msg);
}
return createJMSMessage(msg, redelivery);
}
private boolean isExpired(org.apache.qpid.amqp_1_0.client.Message msg)
{
UnsignedInteger ttl = msg.getHeader().getTtl();
Date timeStamp = msg.getProperties().getCreationTime();
if (ttl != null && timeStamp != null) {
long timeNow = System.currentTimeMillis();
if (timeNow > timeStamp.getTime() + ttl.longValue()) {
return true;
}
}
return false;
}
{code}
--
This message was sent by Atlassian JIRA
(v6.1#6144)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org