You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Rob Godfrey (JIRA)" <ji...@apache.org> on 2016/12/10 14:26:58 UTC
[jira] [Closed] (QPID-5405) [JMS Client] QPid AMQP JMS client does
not appear to check for expired messages that have been prefetched.
[ https://issues.apache.org/jira/browse/QPID-5405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rob Godfrey closed QPID-5405.
-----------------------------
Resolution: Won't Fix
This client has been replaced by the new Qpid JMS library
> [JMS Client] QPid AMQP JMS client does not appear to check for expired messages that have been prefetched.
> ----------------------------------------------------------------------------------------------------------
>
> Key: QPID-5405
> URL: https://issues.apache.org/jira/browse/QPID-5405
> Project: Qpid
> Issue Type: Bug
> Components: JMS AMQP 1.0 Client
> Affects Versions: 0.24, 0.26
> Reporter: Timothy Bish
>
> The JMS client does not seem to check for message expiration for messages that have been received and stored in the prefetch buffer and later processed by a synchronous consumer, or a slow async consumer. The Broker used for testing was an ActiveMQ v5.10-SNAPSHOT.
> The following test shows the basic idea.
> {code}
> @Test(timeout=30000)
> public void testTTL() throws Exception {
> Connection connection = null;
> try {
> QueueImpl queue = new QueueImpl("queue://" + name);
> connection = createConnection();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> connection.start();
> MessageProducer producer = session.createProducer(queue);
> producer.setTimeToLive(1000);
> Message toSend = session.createTextMessage("Sample text");
> producer.send(toSend);
> MessageConsumer consumer = session.createConsumer(queue);
> Message received = consumer.receive(5000);
> assertNotNull(received);
> LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
> producer.setTimeToLive(100);
> producer.send(toSend);
> TimeUnit.SECONDS.sleep(2);
> received = consumer.receive(5000);
> if (received != null) {
> LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
> new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
> received.getJMSExpiration() - received.getJMSTimestamp()});
> }
> assertNull(received);
> } finally {
> connection.close();
> }
> }
> {code}
> Adding in some debug in the MessageConsumerImpl show the dispatch of expired messages.
> {code}
> private MessageImpl receiveImpl(long timeout) throws JMSException
> {
> org.apache.qpid.amqp_1_0.client.Message msg;
> boolean redelivery;
> if(_replaymessages.isEmpty())
> {
> checkReceiverError();
> msg = receive0(timeout);
> redelivery = false;
> }
> else
> {
> msg = _replaymessages.remove(0);
> redelivery = true;
> }
> if(msg != null)
> {
> if (isExpired(msg)) {
> System.out.println("Received expired message:");
> }
> preReceiveAction(msg);
> }
> return createJMSMessage(msg, redelivery);
> }
> private boolean isExpired(org.apache.qpid.amqp_1_0.client.Message msg)
> {
> UnsignedInteger ttl = msg.getHeader().getTtl();
> Date timeStamp = msg.getProperties().getCreationTime();
> if (ttl != null && timeStamp != null) {
> long timeNow = System.currentTimeMillis();
> if (timeNow > timeStamp.getTime() + ttl.longValue()) {
> return true;
> }
> }
> return false;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org