You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by akos <ak...@p92.hu> on 2010/04/13 12:07:07 UTC
Durable subscription with selector problem.
Hi,
I'm using durable subscriptions with message selectors. After a while and
after some broker restart i've noticed some wierd message delivery. Some of
the messages was not delivered immediately but later on together with
another message.
I was digging around and found that the pending queue contains all messages
what NOT fulfill the selector of the subscription right after a broker
restart.
Please check this testcase:
public class SubscriptionSelectorTest extends TestSupport {
MBeanServer mbs;
BrokerService broker = null;
ActiveMQTopic topic;
ActiveMQConnection consumerConnection = null, producerConnection = null;
Session producerSession;
MessageProducer producer;
private int received = 0;
public void testSubscription() throws Exception {
openConsumer();
for (int i = 0; i < 4000; i++) {
sendMessage(false);
}
Thread.sleep(1000);
// this won't fail
assertEquals("Invalid message received.", 0, received);
// this won't fail if the broker is a new one
//assertEquals("Subscription pending queue is not empty.",
getPendingQueueSize(), 0);
closeProducer();
closeConsumer();
stopBroker();
startBroker();
openConsumer();
// Fail! There are messages in the subscription. Non of them
statisfies the selector!
// This is somehow the root of the problem.
//assertEquals("Subscription pending queue is not empty.",
getPendingQueueSize(), 0);
sendMessage(true);
Thread.sleep(1000);
// Fail, because of the invalid messages in the subscription pending
queue
assertEquals("Message is not recieved.", 1, received);
sendMessage(true);
Thread.sleep(100);
// The funny thing is this won't fail because the first message is
delivered together
// with the second one.
assertEquals("Message is not recieved.", 2, received);
}
private void openConsumer() throws Exception {
consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.setClientID("cliID");
consumerConnection.start();
Session session = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(topic,
"subName", "filter=true", false);
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
received++;
}
});
}
private void closeConsumer() throws JMSException {
if (consumerConnection != null)
consumerConnection.close();
consumerConnection = null;
}
private void sendMessage(boolean filter) throws Exception {
if (producerConnection == null) {
producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createProducer(topic);
}
Message message = producerSession.createMessage();
message.setBooleanProperty("filter", filter);
producer.send(message);
}
private void closeProducer() throws JMSException {
if (producerConnection != null)
producerConnection.close();
producerConnection = null;
}
private int getPendingQueueSize() throws Exception {
ObjectName[] subs =
broker.getAdminView().getDurableTopicSubscribers();
for (ObjectName sub: subs) {
if ("cliID".equals(mbs.getAttribute(sub, "ClientId"))) {
Integer size = (Integer) mbs.getAttribute(sub,
"PendingQueueSize");
return size != null ? size : 0;
}
}
assertTrue(false);
return -1;
}
private void startBroker() throws Exception {
broker =
BrokerFactory.createBroker("broker:(vm://localhost)?persistent=true&useJmx=true&brokerName=test-broker");
broker.start();
}
private void stopBroker() throws Exception {
if (broker != null)
broker.stop();
broker = null;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
return new
ActiveMQConnectionFactory("vm://test-broker?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
}
@Override
protected void setUp() throws Exception {
super.setUp();
startBroker();
topic = (ActiveMQTopic) createDestination();
mbs = ManagementFactory.getPlatformMBeanServer();
}
@Override
protected void tearDown() throws Exception {
stopBroker();
super.tearDown();
}
}
I've made a Jira issue also:
https://issues.apache.org/activemq/browse/AMQ-2695
Thanks for your help!
--
View this message in context: http://old.nabble.com/Durable-subscription-with-selector-problem.-tp28224414p28224414.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.