You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Yi Pan <yp...@yahoo-inc.com> on 2013/06/05 20:25:06 UTC
Issues with durable subscriber and
VMPendingDurableSubscriberMessageStoragePolicy
Hi, all,
Has anyone encountered a problem with
VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
messages when the subscriber re-connects?
We have the following test case fails with
VMPendingDurableSubscriberMessageStoragePolicy set as default destination
policy:
1) publish persistent messages:
void publish(String brokerUrl) {
try {
ConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
connection = factory.createTopicConnection();
connection.setClientID(CLIENT_ID);
/**
* Create a JMS session
*/
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
/**
* Fully Qualified Topic Name
*/
String fullyQualifiedTopicName = this.topicName;
/**
* Get the topic instance from the topic name.
*/
Topic topic = session.createTopic(fullyQualifiedTopicName);
/**
* Create a topic subscriber to the topic.
*/
TopicSubscriber ts = session.createDurableSubscriber(topic,
DURABLE_SUB_NAME);
/**
* Create a publisher to publish messages to the topic
*/
TopicPublisher publisher = session.createPublisher(topic);
int deliveryMode = DeliveryMode.PERSISTENT;
/**
* Send and receive 10 messages to the topic.
*/
for (int i = 0; i < 10; i++) {
TextMessage tm = session.createTextMessage("msg-" + i);
LOG.info("publishing msg to broker, msg=" + tm.getText());
publisher.publish(topic, tm, deliveryMode, 4, 0);
}
Thread.sleep(1000);
publisher.close();
session.close();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
;
}
}
}
2) after the messages are published, subscriber connects to receive the
previously published messages:
void consume(String brokerUrl) {
try {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
/**
* Create a topic connection from the obtained connection factory.
*/
connection = factory.createTopicConnection();
connection.setClientID(CLIENT_ID);
/**
* Create a JMS session
*/
TopicSession session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
/**
* Fully Qualified Topic Name
*/
String fullyQualifiedTopicName = this.topicLocalName;
/**
* Get the topic instance from the topic name.
*/
Topic topic = session.createTopic(fullyQualifiedTopicName);
/**
* Create a topic subscriber to the topic.
*/
TopicSubscriber ts = session.createDurableSubscriber(topic,
DURABLE_SUB_NAME);
/**
* Start the connection - message will be delivered to the
* subscribers.
*/
connection.start();
/**
* Send and receive 10 messages to the topic.
*/
for (int i = 0; i < 10; i++) {
TextMessage tm2 = (TextMessage) ts.receive();
if (tm2 == null) {
throw new RuntimeException("message validation failed.");
}
LOG.info("receive msg from broker, msg received="
+ tm2.getText());
}
Thread.sleep(1000);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
;
}
}
}
We noticed that when the default destination policy is set to
VMPendingDurableSubscriberMessageStoragePolicy, the above consume() function
fails and stuck at the blocking call:
TextMessage tm2 = (TextMessage) ts.receive();
Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0
Thanks a lot!
-Yi
--
View this message in context: http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: Issues with durable subscriber and VMPendingDurableSubscriberMessageStoragePolicy
Posted by Christian Posta <ch...@gmail.com>.
Tim fixed some issues around the cursors for durable subs.. can you try
your tests on a recent snapshot?
On Wed, Jun 5, 2013 at 11:25 AM, Yi Pan <yp...@yahoo-inc.com> wrote:
> Hi, all,
>
> Has anyone encountered a problem with
> VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
> messages when the subscriber re-connects?
>
> We have the following test case fails with
> VMPendingDurableSubscriberMessageStoragePolicy set as default destination
> policy:
> 1) publish persistent messages:
> void publish(String brokerUrl) {
> try {
> ConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
> connection = factory.createTopicConnection();
> connection.setClientID(CLIENT_ID);
>
> /**
> * Create a JMS session
> */
> TopicSession session =
> connection.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> /**
> * Fully Qualified Topic Name
> */
> String fullyQualifiedTopicName = this.topicName;
>
> /**
> * Get the topic instance from the topic name.
> */
> Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
> /**
> * Create a topic subscriber to the topic.
> */
> TopicSubscriber ts =
> session.createDurableSubscriber(topic,
> DURABLE_SUB_NAME);
>
> /**
> * Create a publisher to publish messages to the
> topic
> */
> TopicPublisher publisher =
> session.createPublisher(topic);
>
> int deliveryMode = DeliveryMode.PERSISTENT;
>
> /**
> * Send and receive 10 messages to the topic.
> */
> for (int i = 0; i < 10; i++) {
> TextMessage tm =
> session.createTextMessage("msg-" + i);
>
> LOG.info("publishing msg to broker, msg="
> + tm.getText());
>
> publisher.publish(topic, tm, deliveryMode,
> 4, 0);
> }
>
> Thread.sleep(1000);
>
> publisher.close();
> session.close();
>
> } catch (Exception e) {
> LOG.error(e.getMessage(), e);
> } finally {
> try {
>
> connection.close();
> } catch (Exception e) {
> ;
> }
> }
> }
> 2) after the messages are published, subscriber connects to receive the
> previously published messages:
> void consume(String brokerUrl) {
> try {
> ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
> /**
> * Create a topic connection from the obtained
> connection factory.
> */
> connection = factory.createTopicConnection();
>
> connection.setClientID(CLIENT_ID);
>
> /**
> * Create a JMS session
> */
> TopicSession session =
> connection.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> /**
> * Fully Qualified Topic Name
> */
> String fullyQualifiedTopicName =
> this.topicLocalName;
>
> /**
> * Get the topic instance from the topic name.
> */
> Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
> /**
> * Create a topic subscriber to the topic.
> */
> TopicSubscriber ts =
> session.createDurableSubscriber(topic,
> DURABLE_SUB_NAME);
>
> /**
> * Start the connection - message will be
> delivered to the
> * subscribers.
> */
> connection.start();
>
> /**
> * Send and receive 10 messages to the topic.
> */
> for (int i = 0; i < 10; i++) {
>
> TextMessage tm2 = (TextMessage)
> ts.receive();
>
> if (tm2 == null) {
> throw new
> RuntimeException("message validation failed.");
> }
>
> LOG.info("receive msg from broker, msg
> received="
> + tm2.getText());
>
> }
>
> Thread.sleep(1000);
>
> } catch (Exception e) {
> LOG.error(e.getMessage(), e);
> } finally {
> try {
> connection.close();
> } catch (Exception e) {
> ;
> }
> }
> }
>
> We noticed that when the default destination policy is set to
> VMPendingDurableSubscriberMessageStoragePolicy, the above consume()
> function
> fails and stuck at the blocking call:
> TextMessage tm2 = (TextMessage) ts.receive();
>
> Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0
>
> Thanks a lot!
>
> -Yi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
--
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta