You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2010/07/23 17:39:55 UTC

[jira] Resolved: (AMQ-2580) Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector

     [ https://issues.apache.org/activemq/browse/AMQ-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Tully resolved AMQ-2580.
-----------------------------

    Resolution: Fixed

resolved in 967134 - patch was great, test case was great, thanks. completed the impl for JDBC and kahaDB

> Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2580
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2580
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Selector
>    Affects Versions: 5.3.0
>         Environment: phillip:~ henryp$ uname -a
> Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> phillip:~ henryp$ java -version
> java version "1.5.0_20"
> Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315)
> Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing)
>            Reporter: Phillip Henry
>            Assignee: Gary Tully
>             Fix For: 5.4.0
>
>         Attachments: selector_patch_and_test.zip
>
>
> 1. Create a connection factory with a message prefetch size of PREFETCH_SIZE.
> 2. Create a durable subscriber to a Topic with a message selector of "a=X". 
> 3. Disconnect.
> 4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y".
> 5. Just one message is put onto the Topic with string property "a=X".
> 6. The durable subscriber connects again but it does not get the message with string property "a=X". In fact, it gets nothing.
> It appears that upon reconnecting, the message selector is not respected when retrieving the message from storage.
> I've got a unit test to demonstrate this plus a proposed fix.
> {code}
> ### Eclipse Workspace Patch 1.0
> #P activemq
> Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java       (revision 900353)
> +++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java       (working copy)
> @@ -306,7 +306,7 @@
>                                  count++;
>                                  container.setBatchEntry(msg.getMessageId(), entry);
>                              } else {
> -                                break;
> +                                //break;
>                              }
>                          } else {
>                              container.reset();
> Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java      (revision 900353)
> +++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java      (working copy)
> @@ -46,10 +46,11 @@
>      
>      public boolean recoverMessage(Message message) throws Exception {
>          if (listener.hasSpace()) {
> -            listener.recoverMessage(message);
> -            lastRecovered = message.getMessageId();
> -            count++;
> -            return true;
> +            if (listener.recoverMessage(message)) {
> +                lastRecovered = message.getMessageId();
> +                count++;
> +                return true;
> +            }
>          }
>          return false;
>      }
> Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
> ===================================================================
> --- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision 0)
> +++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision 0)
> @@ -0,0 +1,174 @@
> +package org.apache.activemq.pool;
> +
> +import java.io.File;
> +
> +import javax.jms.JMSException;
> +import javax.jms.MessageConsumer;
> +import javax.jms.MessageProducer;
> +import javax.jms.Session;
> +import javax.jms.TextMessage;
> +import javax.jms.Topic;
> +import javax.jms.TopicConnection;
> +import javax.jms.TopicSession;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.activemq.ActiveMQConnectionFactory;
> +import org.apache.activemq.ActiveMQPrefetchPolicy;
> +import org.apache.activemq.broker.BrokerService;
> +import org.apache.activemq.store.PersistenceAdapter;
> +
> +public class PrefetchTest extends TestCase {
> +
> +       private static final String TOPIC_NAME = "topicName";
> +       private static final String CLIENT_ID = "client_id";
> +       private static final String textOfSelectedMsg = "good_message";
> +
> +       protected TopicConnection connection;
> +
> +       private Topic topic;
> +       private Session session;
> +       private MessageProducer producer;
> +       private PooledConnectionFactory connectionFactory;
> +       private TopicConnection topicConnection;
> +       private String bindAddress;
> +       private BrokerService service;
> +
> +       protected void setUp() throws Exception {
> +               bindAddress = "tcp://localhost:61616";
> +               super.setUp();
> +               initDurableBroker();
> +               initConnectionFactory();
> +               initTopic();
> +
> +       }
> +
> +       protected void tearDown() throws Exception {
> +           shutdownClient();
> +               connectionFactory.stop();
> +               service.stop();
> +               super.tearDown();
> +       }
> +
> +       private void initConnection() throws JMSException {
> +           System.out.println("Initializing connection");
> +               connection = (TopicConnection) connectionFactory.createConnection(); 
> +               connection.start();
> +       }
> +
> +    public void testTopicIsDurableSmokeTest() throws Exception {
> +        
> +       initClient();
> +       MessageConsumer consumer = createMessageConsumer();
> +       System.out.println("Consuming message");
> +       assertNull(consumer.receive(1));
> +       shutdownClient();
> +       consumer.close();
> +    
> +       sendMessages();
> +       shutdownClient();
> +    
> +       initClient();
> +       consumer = createMessageConsumer();
> +    
> +       System.out.println("Consuming message");
> +       TextMessage answer1 = (TextMessage)consumer.receive(1000);
> +       assertNotNull(answer1);
> +    
> +       consumer.close();
> +    }
> +    
> +    private MessageConsumer createMessageConsumer() throws JMSException {
> +        System.out.println("creating durable subscriber");
> +       return session.createDurableSubscriber(topic, 
> +               TOPIC_NAME, 
> +               "name='value'", 
> +               false);
> +    }
> +
> +       private void initClient() throws JMSException {
> +           System.out.println("Initializing client");
> +           
> +               initConnection();
> +       initSession();
> +       }
> +
> +       private void shutdownClient()
> +                       throws JMSException {
> +           System.out.println("Closing session and connection");
> +        session.close();
> +        connection.close();
> +        session = null;
> +        connection = null;
> +       }
> +
> +       private void sendMessages()
> +                       throws JMSException {
> +           initConnection();
> +
> +               initSession();
> +
> +               System.out.println("Creating producer");
> +               producer = session.createProducer(topic);
> +
> +               sendMessageThatFailsSelection();
> +        
> +               sendMessage(textOfSelectedMsg, "value");
> +       }
> +
> +       private void initSession() throws JMSException {
> +           System.out.println("Initializing session");
> +               session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +       }
> +
> +       private void sendMessageThatFailsSelection() throws JMSException {
> +               for (int i = 0 ; i < 5 ; i++) {
> +                       String textOfNotSelectedMsg = "Msg_" + i;
> +                       sendMessage(textOfNotSelectedMsg, "not_value");
> +                       System.out.println("#");
> +               }
> +       }
> +
> +       private void sendMessage(
> +                       String msgText,
> +                       String propertyValue) throws JMSException {
> +           System.out.println("Creating message: " + msgText);
> +               TextMessage messageToSelect = session.createTextMessage(msgText);
> +        messageToSelect.setStringProperty("name", propertyValue);
> +        System.out.println("Sending message");
> +               producer.send(messageToSelect);
> +       }
> +
> +       protected void initConnectionFactory() {
> +               ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
> +               connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory);
> +       }
> +
> +
> +       private ActiveMQConnectionFactory createActiveMqConnectionFactory() {
> +               ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory();
> +        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
> +        prefetchPolicy.setDurableTopicPrefetch(2);
> +               activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy );
> +        activeMqConnectionFactory.setClientID(CLIENT_ID);
> +               return activeMqConnectionFactory;
> +       }
> +
> +       private void initDurableBroker() throws Exception {
> +               service = new BrokerService();
> +               PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter();
> +               File file = new File("phills_durable_dir");
> +               persistenceAdaptor.setDirectory(file);
> +               service.setTransportConnectorURIs(new String[] { bindAddress } );
> +               service.setPersistent(true);
> +               service.setUseJmx(true);
> +               service.start();
> +
> +       }
> +
> +       private void initTopic() throws JMSException {
> +               topicConnection = (TopicConnection) connectionFactory.createConnection();
> +               TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> +               topic = topicSession.createTopic(TOPIC_NAME);
> +       }
> +}
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.