You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2010/07/22 12:35:55 UTC
[jira] Work started: (AMQ-2580) Durable subscribers receives
nothing when reconnecting with a prefetch size less than the number of
messages that don't match a message selector
[ https://issues.apache.org/activemq/browse/AMQ-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AMQ-2580 started by Gary Tully.
> Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-2580
> URL: https://issues.apache.org/activemq/browse/AMQ-2580
> Project: ActiveMQ
> Issue Type: Bug
> Components: Selector
> Affects Versions: 5.3.0
> Environment: phillip:~ henryp$ uname -a
> Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> phillip:~ henryp$ java -version
> java version "1.5.0_20"
> Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315)
> Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing)
> Reporter: Phillip Henry
> Assignee: Gary Tully
> Fix For: 5.4.0
>
> Attachments: selector_patch_and_test.zip
>
>
> 1. Create a connection factory with a message prefetch size of PREFETCH_SIZE.
> 2. Create a durable subscriber to a Topic with a message selector of "a=X".
> 3. Disconnect.
> 4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y".
> 5. Just one message is put onto the Topic with string property "a=X".
> 6. The durable subscriber connects again but it does not get the message with string property "a=X". In fact, it gets nothing.
> It appears that upon reconnecting, the message selector is not respected when retrieving the message from storage.
> I've got a unit test to demonstrate this plus a proposed fix.
> {code}
> ### Eclipse Workspace Patch 1.0
> #P activemq
> Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (revision 900353)
> +++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (working copy)
> @@ -306,7 +306,7 @@
> count++;
> container.setBatchEntry(msg.getMessageId(), entry);
> } else {
> - break;
> + //break;
> }
> } else {
> container.reset();
> Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (revision 900353)
> +++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (working copy)
> @@ -46,10 +46,11 @@
>
> public boolean recoverMessage(Message message) throws Exception {
> if (listener.hasSpace()) {
> - listener.recoverMessage(message);
> - lastRecovered = message.getMessageId();
> - count++;
> - return true;
> + if (listener.recoverMessage(message)) {
> + lastRecovered = message.getMessageId();
> + count++;
> + return true;
> + }
> }
> return false;
> }
> Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
> ===================================================================
> --- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0)
> +++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java (revision 0)
> @@ -0,0 +1,174 @@
> +package org.apache.activemq.pool;
> +
> +import java.io.File;
> +
> +import javax.jms.JMSException;
> +import javax.jms.MessageConsumer;
> +import javax.jms.MessageProducer;
> +import javax.jms.Session;
> +import javax.jms.TextMessage;
> +import javax.jms.Topic;
> +import javax.jms.TopicConnection;
> +import javax.jms.TopicSession;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.activemq.ActiveMQConnectionFactory;
> +import org.apache.activemq.ActiveMQPrefetchPolicy;
> +import org.apache.activemq.broker.BrokerService;
> +import org.apache.activemq.store.PersistenceAdapter;
> +
> +public class PrefetchTest extends TestCase {
> +
> + private static final String TOPIC_NAME = "topicName";
> + private static final String CLIENT_ID = "client_id";
> + private static final String textOfSelectedMsg = "good_message";
> +
> + protected TopicConnection connection;
> +
> + private Topic topic;
> + private Session session;
> + private MessageProducer producer;
> + private PooledConnectionFactory connectionFactory;
> + private TopicConnection topicConnection;
> + private String bindAddress;
> + private BrokerService service;
> +
> + protected void setUp() throws Exception {
> + bindAddress = "tcp://localhost:61616";
> + super.setUp();
> + initDurableBroker();
> + initConnectionFactory();
> + initTopic();
> +
> + }
> +
> + protected void tearDown() throws Exception {
> + shutdownClient();
> + connectionFactory.stop();
> + service.stop();
> + super.tearDown();
> + }
> +
> + private void initConnection() throws JMSException {
> + System.out.println("Initializing connection");
> + connection = (TopicConnection) connectionFactory.createConnection();
> + connection.start();
> + }
> +
> + public void testTopicIsDurableSmokeTest() throws Exception {
> +
> + initClient();
> + MessageConsumer consumer = createMessageConsumer();
> + System.out.println("Consuming message");
> + assertNull(consumer.receive(1));
> + shutdownClient();
> + consumer.close();
> +
> + sendMessages();
> + shutdownClient();
> +
> + initClient();
> + consumer = createMessageConsumer();
> +
> + System.out.println("Consuming message");
> + TextMessage answer1 = (TextMessage)consumer.receive(1000);
> + assertNotNull(answer1);
> +
> + consumer.close();
> + }
> +
> + private MessageConsumer createMessageConsumer() throws JMSException {
> + System.out.println("creating durable subscriber");
> + return session.createDurableSubscriber(topic,
> + TOPIC_NAME,
> + "name='value'",
> + false);
> + }
> +
> + private void initClient() throws JMSException {
> + System.out.println("Initializing client");
> +
> + initConnection();
> + initSession();
> + }
> +
> + private void shutdownClient()
> + throws JMSException {
> + System.out.println("Closing session and connection");
> + session.close();
> + connection.close();
> + session = null;
> + connection = null;
> + }
> +
> + private void sendMessages()
> + throws JMSException {
> + initConnection();
> +
> + initSession();
> +
> + System.out.println("Creating producer");
> + producer = session.createProducer(topic);
> +
> + sendMessageThatFailsSelection();
> +
> + sendMessage(textOfSelectedMsg, "value");
> + }
> +
> + private void initSession() throws JMSException {
> + System.out.println("Initializing session");
> + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> + }
> +
> + private void sendMessageThatFailsSelection() throws JMSException {
> + for (int i = 0 ; i < 5 ; i++) {
> + String textOfNotSelectedMsg = "Msg_" + i;
> + sendMessage(textOfNotSelectedMsg, "not_value");
> + System.out.println("#");
> + }
> + }
> +
> + private void sendMessage(
> + String msgText,
> + String propertyValue) throws JMSException {
> + System.out.println("Creating message: " + msgText);
> + TextMessage messageToSelect = session.createTextMessage(msgText);
> + messageToSelect.setStringProperty("name", propertyValue);
> + System.out.println("Sending message");
> + producer.send(messageToSelect);
> + }
> +
> + protected void initConnectionFactory() {
> + ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
> + connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory);
> + }
> +
> +
> + private ActiveMQConnectionFactory createActiveMqConnectionFactory() {
> + ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory();
> + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
> + prefetchPolicy.setDurableTopicPrefetch(2);
> + activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy );
> + activeMqConnectionFactory.setClientID(CLIENT_ID);
> + return activeMqConnectionFactory;
> + }
> +
> + private void initDurableBroker() throws Exception {
> + service = new BrokerService();
> + PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter();
> + File file = new File("phills_durable_dir");
> + persistenceAdaptor.setDirectory(file);
> + service.setTransportConnectorURIs(new String[] { bindAddress } );
> + service.setPersistent(true);
> + service.setUseJmx(true);
> + service.start();
> +
> + }
> +
> + private void initTopic() throws JMSException {
> + topicConnection = (TopicConnection) connectionFactory.createConnection();
> + TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> + topic = topicSession.createTopic(TOPIC_NAME);
> + }
> +}
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.