You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by akos <ak...@p92.hu> on 2010/04/13 12:07:07 UTC

Durable subscription with selector problem.

Hi,

I'm using durable subscriptions with message selectors. After a while and
after some broker restart i've noticed some wierd message delivery. Some of
the messages was not delivered immediately but later on together with
another message.
I was digging around and found that the pending queue contains all messages
what NOT fulfill the selector of the subscription right after a broker
restart.

Please check this testcase:

public class SubscriptionSelectorTest extends TestSupport {

    MBeanServer mbs;
    BrokerService broker = null;
    ActiveMQTopic topic;

    ActiveMQConnection consumerConnection = null, producerConnection = null;
    Session producerSession;
    MessageProducer producer;

    private int received = 0;

    public void testSubscription() throws Exception {
        openConsumer();
        for (int i = 0; i < 4000; i++) {
            sendMessage(false);
        }
        Thread.sleep(1000);

        // this won't fail
        assertEquals("Invalid message received.", 0, received);
        // this won't fail if the broker is a new one
        //assertEquals("Subscription pending queue is not empty.",
getPendingQueueSize(), 0);

        closeProducer();
        closeConsumer();
        stopBroker();

        startBroker();
        openConsumer();

        // Fail! There are messages in the subscription. Non of them
statisfies the selector!
        // This is somehow the root of the problem.
        //assertEquals("Subscription pending queue is not empty.",
getPendingQueueSize(), 0);

        sendMessage(true);
        Thread.sleep(1000);
        // Fail, because of the invalid messages in the subscription pending
queue
        assertEquals("Message is not recieved.", 1, received);

        sendMessage(true);
        Thread.sleep(100);
        // The funny thing is this won't fail because the first message is
delivered together
        // with the second one.
        assertEquals("Message is not recieved.", 2, received);
    }

    private void openConsumer() throws Exception {
        consumerConnection = (ActiveMQConnection) createConnection();
        consumerConnection.setClientID("cliID");
        consumerConnection.start();
        Session session = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        TopicSubscriber subscriber = session.createDurableSubscriber(topic,
"subName", "filter=true", false);

        subscriber.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                received++;
            }
        });
    }

    private void closeConsumer() throws JMSException {
        if (consumerConnection != null)
            consumerConnection.close();
        consumerConnection = null;
    }

    private void sendMessage(boolean filter) throws Exception {
        if (producerConnection == null) {
            producerConnection = (ActiveMQConnection) createConnection();
            producerConnection.start();
            producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
            producer = producerSession.createProducer(topic);
        }

        Message message = producerSession.createMessage();
        message.setBooleanProperty("filter", filter);
        producer.send(message);
    }

    private void closeProducer() throws JMSException {
        if (producerConnection != null)
            producerConnection.close();
        producerConnection = null;
    }

    private int getPendingQueueSize() throws Exception {
        ObjectName[] subs =
broker.getAdminView().getDurableTopicSubscribers();
        for (ObjectName sub: subs) {
            if ("cliID".equals(mbs.getAttribute(sub, "ClientId"))) {
                Integer size = (Integer) mbs.getAttribute(sub,
"PendingQueueSize");
                return size != null ? size : 0;
            }
        }
        assertTrue(false);
        return -1;
    }

    private void startBroker() throws Exception {
        broker =
BrokerFactory.createBroker("broker:(vm://localhost)?persistent=true&useJmx=true&brokerName=test-broker");
        broker.start();
    }

    private void stopBroker() throws Exception {
        if (broker != null)
            broker.stop();
        broker = null;
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
        return new
ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();

        startBroker();
        topic = (ActiveMQTopic) createDestination();
        mbs = ManagementFactory.getPlatformMBeanServer();
    }

    @Override
    protected void tearDown() throws Exception {
        stopBroker();
        super.tearDown();
    }
}

I've made a Jira issue also:
https://issues.apache.org/activemq/browse/AMQ-2695
Thanks for your help!

-- 
View this message in context: http://old.nabble.com/Durable-subscription-with-selector-problem.-tp28224414p28224414.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.