You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2013/12/06 22:29:37 UTC

[jira] [Created] (QPID-5405) [JMS Client] QPid AMQP JMS client does not appear to check for expired messages that have been prefetched.

Timothy Bish created QPID-5405:
----------------------------------

             Summary: [JMS Client] QPid AMQP JMS client does not appear to check for expired messages that have been prefetched.
                 Key: QPID-5405
                 URL: https://issues.apache.org/jira/browse/QPID-5405
             Project: Qpid
          Issue Type: Bug
          Components: Java Client
    Affects Versions: 0.24, 0.26
            Reporter: Timothy Bish


The JMS client does not seem to check for message expiration for messages that have been received and stored in the prefetch buffer and later processed by a synchronous consumer, or a slow async consumer.  The Broker used for testing was an ActiveMQ v5.10-SNAPSHOT.

The following test shows the basic idea.

{code}
    @Test(timeout=30000)
    public void testTTL() throws Exception {
        Connection connection = null;
        try {
            QueueImpl queue = new QueueImpl("queue://" + name);
            connection = createConnection();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            MessageProducer producer = session.createProducer(queue);
            producer.setTimeToLive(1000);
            Message toSend = session.createTextMessage("Sample text");
            producer.send(toSend);
            MessageConsumer consumer = session.createConsumer(queue);
            Message received = consumer.receive(5000);
            assertNotNull(received);
            LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
            producer.setTimeToLive(100);
            producer.send(toSend);
            TimeUnit.SECONDS.sleep(2);
            received = consumer.receive(5000);
            if (received != null) {
                LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
                         new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
                                        received.getJMSExpiration() - received.getJMSTimestamp()});
            }
            assertNull(received);
        } finally {
            connection.close();
        }
    }
{code}

Adding in some debug in the MessageConsumerImpl show the dispatch of expired messages.

{code}
    private MessageImpl receiveImpl(long timeout) throws JMSException
    {
        org.apache.qpid.amqp_1_0.client.Message msg;
        boolean redelivery;
        if(_replaymessages.isEmpty())
        {
            checkReceiverError();
            msg = receive0(timeout);
            redelivery = false;
        }
        else
        {
            msg = _replaymessages.remove(0);
            redelivery = true;
        }

        if(msg != null)
        {
            if (isExpired(msg)) {
                System.out.println("Received expired message:");
            }

            preReceiveAction(msg);
        }

        return createJMSMessage(msg, redelivery);
    }

    private boolean isExpired(org.apache.qpid.amqp_1_0.client.Message msg)
    {
        UnsignedInteger ttl = msg.getHeader().getTtl();
        Date timeStamp = msg.getProperties().getCreationTime();
        if (ttl != null && timeStamp != null) {

            long timeNow = System.currentTimeMillis();
            if (timeNow > timeStamp.getTime() + ttl.longValue()) {
                return true;
            }
        }

        return false;
    }
{code}



--
This message was sent by Atlassian JIRA
(v6.1#6144)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org