You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Yi Pan <yp...@yahoo-inc.com> on 2013/06/05 20:25:06 UTC

Issues with durable subscriber and VMPendingDurableSubscriberMessageStoragePolicy

Hi, all,

Has anyone encountered a problem with
VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
messages when the subscriber re-connects?

We have the following test case fails with
VMPendingDurableSubscriberMessageStoragePolicy set as default destination
policy:			
1) publish persistent messages:
              void publish(String brokerUrl) {
                  try {
                        ConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
                        connection = factory.createTopicConnection();
			connection.setClientID(CLIENT_ID);

			/**
			 * Create a JMS session
			 */
			TopicSession session = connection.createTopicSession(false,
					Session.AUTO_ACKNOWLEDGE);

			/**
			 * Fully Qualified Topic Name
			 */
			String fullyQualifiedTopicName = this.topicName;

			/**
			 * Get the topic instance from the topic name.
			 */
			Topic topic = session.createTopic(fullyQualifiedTopicName);

			/**
			 * Create a topic subscriber to the topic.
			 */
			TopicSubscriber ts = session.createDurableSubscriber(topic,
					DURABLE_SUB_NAME);

			/**
			 * Create a publisher to publish messages to the topic
			 */
			TopicPublisher publisher = session.createPublisher(topic);

			int deliveryMode = DeliveryMode.PERSISTENT;

			/**
			 * Send and receive 10 messages to the topic.
			 */
			for (int i = 0; i < 10; i++) {
				TextMessage tm = session.createTextMessage("msg-" + i);

				LOG.info("publishing msg to broker, msg=" + tm.getText());

				publisher.publish(topic, tm, deliveryMode, 4, 0);
			}

			Thread.sleep(1000);

			publisher.close();
			session.close();

		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} finally {
			try {

				connection.close();
			} catch (Exception e) {
				;
			}
		}
           }
2) after the messages are published, subscriber connects to receive the
previously published messages:
    void consume(String brokerUrl) {
		try {
                        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
			/**
			 * Create a topic connection from the obtained connection factory.
			 */
			connection = factory.createTopicConnection();

			connection.setClientID(CLIENT_ID);

			/**
			 * Create a JMS session
			 */
			TopicSession session = connection.createTopicSession(false,
					Session.AUTO_ACKNOWLEDGE);

			/**
			 * Fully Qualified Topic Name
			 */
			String fullyQualifiedTopicName = this.topicLocalName;

			/**
			 * Get the topic instance from the topic name.
			 */
			Topic topic = session.createTopic(fullyQualifiedTopicName);

			/**
			 * Create a topic subscriber to the topic.
			 */
			TopicSubscriber ts = session.createDurableSubscriber(topic,
					DURABLE_SUB_NAME);

			/**
			 * Start the connection - message will be delivered to the
			 * subscribers.
			 */
			connection.start();

			/**
			 * Send and receive 10 messages to the topic.
			 */
			for (int i = 0; i < 10; i++) {

				TextMessage tm2 = (TextMessage) ts.receive();

				if (tm2 == null) {
					throw new RuntimeException("message validation failed.");
				}

				LOG.info("receive msg from broker, msg received="
						+ tm2.getText());

			}

			Thread.sleep(1000);

		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} finally {
			try {
				connection.close();
			} catch (Exception e) {
				;
			}
		}
    }

We noticed that when the default destination policy is set to
VMPendingDurableSubscriberMessageStoragePolicy, the above consume() function
fails and stuck at the blocking call: 
               TextMessage tm2 = (TextMessage) ts.receive();

Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0

Thanks a lot!

-Yi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Issues with durable subscriber and VMPendingDurableSubscriberMessageStoragePolicy

Posted by Christian Posta <ch...@gmail.com>.
Tim fixed some issues around the cursors for durable subs.. can you try
your tests on a recent snapshot?


On Wed, Jun 5, 2013 at 11:25 AM, Yi Pan <yp...@yahoo-inc.com> wrote:

> Hi, all,
>
> Has anyone encountered a problem with
> VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
> messages when the subscriber re-connects?
>
> We have the following test case fails with
> VMPendingDurableSubscriberMessageStoragePolicy set as default destination
> policy:
> 1) publish persistent messages:
>               void publish(String brokerUrl) {
>                   try {
>                         ConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
>                         connection = factory.createTopicConnection();
>                         connection.setClientID(CLIENT_ID);
>
>                         /**
>                          * Create a JMS session
>                          */
>                         TopicSession session =
> connection.createTopicSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>
>                         /**
>                          * Fully Qualified Topic Name
>                          */
>                         String fullyQualifiedTopicName = this.topicName;
>
>                         /**
>                          * Get the topic instance from the topic name.
>                          */
>                         Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
>                         /**
>                          * Create a topic subscriber to the topic.
>                          */
>                         TopicSubscriber ts =
> session.createDurableSubscriber(topic,
>                                         DURABLE_SUB_NAME);
>
>                         /**
>                          * Create a publisher to publish messages to the
> topic
>                          */
>                         TopicPublisher publisher =
> session.createPublisher(topic);
>
>                         int deliveryMode = DeliveryMode.PERSISTENT;
>
>                         /**
>                          * Send and receive 10 messages to the topic.
>                          */
>                         for (int i = 0; i < 10; i++) {
>                                 TextMessage tm =
> session.createTextMessage("msg-" + i);
>
>                                 LOG.info("publishing msg to broker, msg="
> + tm.getText());
>
>                                 publisher.publish(topic, tm, deliveryMode,
> 4, 0);
>                         }
>
>                         Thread.sleep(1000);
>
>                         publisher.close();
>                         session.close();
>
>                 } catch (Exception e) {
>                         LOG.error(e.getMessage(), e);
>                 } finally {
>                         try {
>
>                                 connection.close();
>                         } catch (Exception e) {
>                                 ;
>                         }
>                 }
>            }
> 2) after the messages are published, subscriber connects to receive the
> previously published messages:
>     void consume(String brokerUrl) {
>                 try {
>                         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
>                         /**
>                          * Create a topic connection from the obtained
> connection factory.
>                          */
>                         connection = factory.createTopicConnection();
>
>                         connection.setClientID(CLIENT_ID);
>
>                         /**
>                          * Create a JMS session
>                          */
>                         TopicSession session =
> connection.createTopicSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>
>                         /**
>                          * Fully Qualified Topic Name
>                          */
>                         String fullyQualifiedTopicName =
> this.topicLocalName;
>
>                         /**
>                          * Get the topic instance from the topic name.
>                          */
>                         Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
>                         /**
>                          * Create a topic subscriber to the topic.
>                          */
>                         TopicSubscriber ts =
> session.createDurableSubscriber(topic,
>                                         DURABLE_SUB_NAME);
>
>                         /**
>                          * Start the connection - message will be
> delivered to the
>                          * subscribers.
>                          */
>                         connection.start();
>
>                         /**
>                          * Send and receive 10 messages to the topic.
>                          */
>                         for (int i = 0; i < 10; i++) {
>
>                                 TextMessage tm2 = (TextMessage)
> ts.receive();
>
>                                 if (tm2 == null) {
>                                         throw new
> RuntimeException("message validation failed.");
>                                 }
>
>                                 LOG.info("receive msg from broker, msg
> received="
>                                                 + tm2.getText());
>
>                         }
>
>                         Thread.sleep(1000);
>
>                 } catch (Exception e) {
>                         LOG.error(e.getMessage(), e);
>                 } finally {
>                         try {
>                                 connection.close();
>                         } catch (Exception e) {
>                                 ;
>                         }
>                 }
>     }
>
> We noticed that when the default destination policy is set to
> VMPendingDurableSubscriberMessageStoragePolicy, the above consume()
> function
> fails and stuck at the blocking call:
>                TextMessage tm2 = (TextMessage) ts.receive();
>
> Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0
>
> Thanks a lot!
>
> -Yi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta