You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sgliu <sh...@sina.com> on 2006/11/16 09:41:32 UTC
above topic
follow program,why cann't receive data?
How should I modify code?
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
class HelloWorldProducer : public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageProducer* producer;
int numMessages;
public:
HelloWorldProducer( int numMessages ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
}
virtual ~HelloWorldProducer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
connection =
connectionFactory->createConnection(user,passwd,sID);
connection->start();
string sss=connection->getClientId();
cout << sss << endl;
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
destination = session->createTopic( "mytopic" );
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::PERSISTANT );
producer->setTimeToLive(100000000);
string threadIdStr = Integer::toString( Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );
string messageID="messageID";
message->setCMSExpiration(10000000000);
message->setCMSMessageId(messageID);
// Tell the producer to send the message
printf( "Sent message from thread %s\n", threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) {}
destination = NULL;
try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) {}
producer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) {}
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) {}
connection = NULL;
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageConsumer* consumer;
long waitMillis;
public:
HelloWorldConsumer( long waitMillis ){
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->waitMillis = waitMillis;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);
// Create a Connection
connection =
connectionFactory->createConnection();//user,passwd,sID);
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
destination = session->createTopic( "mytopic" );
consumer = session->createDurableConsumer( destination , user ,
"",false);
consumer->setMessageListener( this );
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ){
try
{
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Received: %s\n", text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex ) {
printf("JMS Exception occured. Shutting down client.\n");
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
void Produce()
{
HelloWorldProducer producer( 2 );
Thread producerThread( &producer );
producerThread.start();
producerThread.join();
}
void Consumer()
{
HelloWorldConsumer consumer( 10000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
}
int main(int argc, char* argv[])
{
Produce();
Consumer();
}
--
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7373622
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: above topic
Posted by sgliu <sh...@sina.com>.
I wish I producer message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1 can
receive A , C2 can receive A.
tabish121 wrote:
>
>>
>> How do I achieve durable subscription base on my program?
>>
>
> What is it you are trying to do? What problem are you trying to solve?
>
>
>>
>>
>> sgliu wrote:
>> >
>> > Thanks for you advice.I wil read your tell me content.
>> >
>> >
>> >
>> >
>> > tabish121 wrote:
>> >>
>> >>>
>> >>>
>> >>> follow program,why cann't receive data?
>> >>> How should I modify code?
>> >>
>> >> Looks like you produce before the durable consumer has been
> connected
>> >> once.
>> >>
>> >> See this link:
>> >>
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> >>
>> >> A durable consumer must have connected and then disconnected before
>> >> messages are persisted in anticipation of the consumer
> reconnecting.
>> >> That's why its important to use the same subscription name when
>> >> reconnecting, so that the broker know that the consumer that was
>> >> subscribed as durable is now back and it can deliver the messages
> that
>> >> were stored in its absence.
>> >>
>> >>>
>> >>> #include <activemq/concurrent/Thread.h>
>> >>> #include <activemq/concurrent/Runnable.h>
>> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> >>> #include <activemq/util/Integer.h>
>> >>> #include <cms/Connection.h>
>> >>> #include <cms/Session.h>
>> >>> #include <cms/TextMessage.h>
>> >>> #include <cms/ExceptionListener.h>
>> >>> #include <cms/MessageListener.h>
>> >>> #include <stdlib.h>
>> >>>
>> >>> using namespace activemq::core;
>> >>> using namespace activemq::util;
>> >>> using namespace activemq::concurrent;
>> >>> using namespace cms;
>> >>> using namespace std;
>> >>>
>> >>> class HelloWorldProducer : public Runnable {
>> >>> private:
>> >>>
>> >>> Connection* connection;
>> >>> Session* session;
>> >>> Topic* destination;
>> >>> MessageProducer* producer;
>> >>> int numMessages;
>> >>>
>> >>> public:
>> >>>
>> >>> HelloWorldProducer( int numMessages ){
>> >>> connection = NULL;
>> >>> session = NULL;
>> >>> destination = NULL;
>> >>> producer = NULL;
>> >>> this->numMessages = numMessages;
>> >>> }
>> >>>
>> >>> virtual ~HelloWorldProducer(){
>> >>> cleanup();
>> >>> }
>> >>>
>> >>> virtual void run() {
>> >>> try {
>> >>> string user,passwd,sID;
>> >>> user="default";
>> >>> passwd="";
>> >>> sID="lsgID";
>> >>> ActiveMQConnectionFactory* connectionFactory = new
>> >>>
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>> connection =
>> >>> connectionFactory->createConnection(user,passwd,sID);
>> >>> connection->start();
>> >>>
>> >>> string sss=connection->getClientId();
>> >>> cout << sss << endl;
>> >>>
>> >>> session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> destination = session->createTopic( "mytopic" );
>> >>>
>> >>> producer = session->createProducer( destination
>> >> );
>> >>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> >>>
>> >>> producer->setTimeToLive(100000000);
>> >>> string threadIdStr = Integer::toString(
> Thread::getId() );
>> >>>
>> >>> // Create a messages
>> >>> string text = (string)"Hello world! from thread " +
>> >>> threadIdStr;
>> >>>
>> >>> for( int ix=0; ix<numMessages; ++ix ){
>> >>> TextMessage* message = session->createTextMessage(
>> >> text
>> >>> );
>> >>>
>> >>> string messageID="messageID";
>> >>> message->setCMSExpiration(10000000000);
>> >>> message->setCMSMessageId(messageID);
>> >>>
>> >>> // Tell the producer to send the message
>> >>> printf( "Sent message from thread %s\n",
>> >>> threadIdStr.c_str() );
>> >>> producer->send( message );
>> >>>
>> >>> delete message;
>> >>> }
>> >>>
>> >>> }catch ( CMSException& e ) {
>> >>> e.printStackTrace();
>> >>> }
>> >>> }
>> >>>
>> >>> private:
>> >>>
>> >>> void cleanup(){
>> >>>
>> >>> // Destroy resources.
>> >>> try{
>> >>> if( destination != NULL ) delete destination;
>> >>> }catch ( CMSException& e ) {}
>> >>> destination = NULL;
>> >>>
>> >>> try{
>> >>> if( producer != NULL ) delete producer;
>> >>> }catch ( CMSException& e ) {}
>> >>> producer = NULL;
>> >>>
>> >>> // Close open resources.
>> >>> try{
>> >>> if( session != NULL ) session->close();
>> >>> if( connection != NULL )
> connection->close();
>> >>> }catch ( CMSException& e ) {}
>> >>>
>> >>> try{
>> >>> if( session != NULL ) delete session;
>> >>> }catch ( CMSException& e ) {}
>> >>> session = NULL;
>> >>>
>> >>> try{
>> >>> if( connection != NULL ) delete connection;
>> >>> }catch ( CMSException& e ) {}
>> >>> connection = NULL;
>> >>> }
>> >>> };
>> >>>
>> >>> class HelloWorldConsumer : public ExceptionListener,
>> >>> public MessageListener,
>> >>> public Runnable {
>> >>>
>> >>> private:
>> >>>
>> >>> Connection* connection;
>> >>> Session* session;
>> >>> Topic* destination;
>> >>> MessageConsumer* consumer;
>> >>> long waitMillis;
>> >>>
>> >>> public:
>> >>>
>> >>> HelloWorldConsumer( long waitMillis ){
>> >>> connection = NULL;
>> >>> session = NULL;
>> >>> destination = NULL;
>> >>> consumer = NULL;
>> >>> this->waitMillis = waitMillis;
>> >>> }
>> >>> virtual ~HelloWorldConsumer(){
>> >>> cleanup();
>> >>> }
>> >>>
>> >>> virtual void run() {
>> >>>
>> >>> try {
>> >>>
>> >>> string user,passwd,sID;
>> >>> user="default";
>> >>> passwd="";
>> >>> sID="lsgID";
>> >>> // Create a ConnectionFactory
>> >>> ActiveMQConnectionFactory* connectionFactory =
>> >>> new ActiveMQConnectionFactory(
>> >>> "tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>> // Create a Connection
>> >>> connection =
>> >>> connectionFactory->createConnection();//user,passwd,sID);
>> >>> delete connectionFactory;
>> >>> connection->start();
>> >>>
>> >>> connection->setExceptionListener(this);
>> >>>
>> >>> // Create a Session
>> >>> session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> destination = session->createTopic( "mytopic" );
>> >>> consumer = session->createDurableConsumer(
>> >> destination ,
>> >>> user ,
>> >>> "",false);
>> >>>
>> >>> consumer->setMessageListener( this );
>> >>>
>> >>> Thread::sleep( waitMillis );
>> >>>
>> >>> } catch (CMSException& e) {
>> >>> e.printStackTrace();
>> >>> }
>> >>> }
>> >>>
>> >>> virtual void onMessage( const Message* message ){
>> >>>
>> >>> try
>> >>> {
>> >>> const TextMessage* textMessage =
>> >>> dynamic_cast< const TextMessage* >( message );
>> >>> string text = textMessage->getText();
>> >>> printf( "Received: %s\n", text.c_str() );
>> >>> } catch (CMSException& e) {
>> >>> e.printStackTrace();
>> >>> }
>> >>> }
>> >>>
>> >>> virtual void onException( const CMSException& ex ) {
>> >>> printf("JMS Exception occured. Shutting down client.\n");
>> >>> }
>> >>>
>> >>> private:
>> >>>
>> >>> void cleanup(){
>> >>>
>> >>> // Destroy resources.
>> >>> try{
>> >>> if( destination != NULL ) delete destination;
>> >>> }catch (CMSException& e) {}
>> >>> destination = NULL;
>> >>>
>> >>> try{
>> >>> if( consumer != NULL ) delete consumer;
>> >>> }catch (CMSException& e) {}
>> >>> consumer = NULL;
>> >>>
>> >>> // Close open resources.
>> >>> try{
>> >>> if( session != NULL ) session->close();
>> >>> if( connection != NULL ) connection->close();
>> >>> }catch (CMSException& e) {}
>> >>>
>> >>> try{
>> >>> if( session != NULL ) delete session;
>> >>> }catch (CMSException& e) {}
>> >>> session = NULL;
>> >>>
>> >>> try{
>> >>> if( connection != NULL ) delete connection;
>> >>> }catch (CMSException& e) {}
>> >>> connection = NULL;
>> >>> }
>> >>> };
>> >>> void Produce()
>> >>> {
>> >>> HelloWorldProducer producer( 2 );
>> >>> Thread producerThread( &producer );
>> >>> producerThread.start();
>> >>> producerThread.join();
>> >>> }
>> >>> void Consumer()
>> >>> {
>> >>> HelloWorldConsumer consumer( 10000 );
>> >>> Thread consumerThread( &consumer );
>> >>> consumerThread.start();
>> >>> consumerThread.join();
>> >>> }
>> >>> int main(int argc, char* argv[])
>> >>> {
>> >>> Produce();
>> >>> Consumer();
>> >>> }
>> >>> --
>> >>> View this message in context: http://www.nabble.com/above-topic-
>> >>> tf2641566.html#a7373622
>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >>
>> >
>> >
>>
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7393074
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
--
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7413840
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: above topic
Posted by sgliu <sh...@sina.com>.
How do I achieve durable subscription base on my program?
sgliu wrote:
>
> Thanks for you advice.I wil read your tell me content.
>
>
>
>
> tabish121 wrote:
>>
>>>
>>>
>>> follow program,why cann't receive data?
>>> How should I modify code?
>>
>> Looks like you produce before the durable consumer has been connected
>> once.
>>
>> See this link:
>> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>>
>> A durable consumer must have connected and then disconnected before
>> messages are persisted in anticipation of the consumer reconnecting.
>> That's why its important to use the same subscription name when
>> reconnecting, so that the broker know that the consumer that was
>> subscribed as durable is now back and it can deliver the messages that
>> were stored in its absence.
>>
>>>
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>>
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>>
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageProducer* producer;
>>> int numMessages;
>>>
>>> public:
>>>
>>> HelloWorldProducer( int numMessages ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> producer = NULL;
>>> this->numMessages = numMessages;
>>> }
>>>
>>> virtual ~HelloWorldProducer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>> try {
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>>
>>> connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>> connection->start();
>>>
>>> string sss=connection->getClientId();
>>> cout << sss << endl;
>>>
>>> session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>>
>>> producer = session->createProducer( destination
>> );
>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>
>>> producer->setTimeToLive(100000000);
>>> string threadIdStr = Integer::toString( Thread::getId() );
>>>
>>> // Create a messages
>>> string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>>
>>> for( int ix=0; ix<numMessages; ++ix ){
>>> TextMessage* message = session->createTextMessage(
>> text
>>> );
>>>
>>> string messageID="messageID";
>>> message->setCMSExpiration(10000000000);
>>> message->setCMSMessageId(messageID);
>>>
>>> // Tell the producer to send the message
>>> printf( "Sent message from thread %s\n",
>>> threadIdStr.c_str() );
>>> producer->send( message );
>>>
>>> delete message;
>>> }
>>>
>>> }catch ( CMSException& e ) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch ( CMSException& e ) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( producer != NULL ) delete producer;
>>> }catch ( CMSException& e ) {}
>>> producer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch ( CMSException& e ) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch ( CMSException& e ) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch ( CMSException& e ) {}
>>> connection = NULL;
>>> }
>>> };
>>>
>>> class HelloWorldConsumer : public ExceptionListener,
>>> public MessageListener,
>>> public Runnable {
>>>
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageConsumer* consumer;
>>> long waitMillis;
>>>
>>> public:
>>>
>>> HelloWorldConsumer( long waitMillis ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> consumer = NULL;
>>> this->waitMillis = waitMillis;
>>> }
>>> virtual ~HelloWorldConsumer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>>
>>> try {
>>>
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> // Create a ConnectionFactory
>>> ActiveMQConnectionFactory* connectionFactory =
>>> new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>>
>>> // Create a Connection
>>> connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>> delete connectionFactory;
>>> connection->start();
>>>
>>> connection->setExceptionListener(this);
>>>
>>> // Create a Session
>>> session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>> consumer = session->createDurableConsumer(
>> destination ,
>>> user ,
>>> "",false);
>>>
>>> consumer->setMessageListener( this );
>>>
>>> Thread::sleep( waitMillis );
>>>
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onMessage( const Message* message ){
>>>
>>> try
>>> {
>>> const TextMessage* textMessage =
>>> dynamic_cast< const TextMessage* >( message );
>>> string text = textMessage->getText();
>>> printf( "Received: %s\n", text.c_str() );
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onException( const CMSException& ex ) {
>>> printf("JMS Exception occured. Shutting down client.\n");
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch (CMSException& e) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( consumer != NULL ) delete consumer;
>>> }catch (CMSException& e) {}
>>> consumer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch (CMSException& e) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch (CMSException& e) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch (CMSException& e) {}
>>> connection = NULL;
>>> }
>>> };
>>> void Produce()
>>> {
>>> HelloWorldProducer producer( 2 );
>>> Thread producerThread( &producer );
>>> producerThread.start();
>>> producerThread.join();
>>> }
>>> void Consumer()
>>> {
>>> HelloWorldConsumer consumer( 10000 );
>>> Thread consumerThread( &consumer );
>>> consumerThread.start();
>>> consumerThread.join();
>>> }
>>> int main(int argc, char* argv[])
>>> {
>>> Produce();
>>> Consumer();
>>> }
>>> --
>>> View this message in context: http://www.nabble.com/above-topic-
>>> tf2641566.html#a7373622
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
>>
>
>
--
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7393074
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
RE: above topic
Posted by sgliu <sh...@sina.com>.
Thanks for you advice.I wil read your tell me content.
tabish121 wrote:
>
>>
>>
>> follow program,why cann't receive data?
>> How should I modify code?
>
> Looks like you produce before the durable consumer has been connected
> once.
>
> See this link:
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>
> A durable consumer must have connected and then disconnected before
> messages are persisted in anticipation of the consumer reconnecting.
> That's why its important to use the same subscription name when
> reconnecting, so that the broker know that the consumer that was
> subscribed as durable is now back and it can deliver the messages that
> were stored in its absence.
>
>>
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Integer.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>>
>> using namespace activemq::core;
>> using namespace activemq::util;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>>
>> class HelloWorldProducer : public Runnable {
>> private:
>>
>> Connection* connection;
>> Session* session;
>> Topic* destination;
>> MessageProducer* producer;
>> int numMessages;
>>
>> public:
>>
>> HelloWorldProducer( int numMessages ){
>> connection = NULL;
>> session = NULL;
>> destination = NULL;
>> producer = NULL;
>> this->numMessages = numMessages;
>> }
>>
>> virtual ~HelloWorldProducer(){
>> cleanup();
>> }
>>
>> virtual void run() {
>> try {
>> string user,passwd,sID;
>> user="default";
>> passwd="";
>> sID="lsgID";
>> ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>
>> connection =
>> connectionFactory->createConnection(user,passwd,sID);
>> connection->start();
>>
>> string sss=connection->getClientId();
>> cout << sss << endl;
>>
>> session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> destination = session->createTopic( "mytopic" );
>>
>> producer = session->createProducer( destination
> );
>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>
>> producer->setTimeToLive(100000000);
>> string threadIdStr = Integer::toString( Thread::getId() );
>>
>> // Create a messages
>> string text = (string)"Hello world! from thread " +
>> threadIdStr;
>>
>> for( int ix=0; ix<numMessages; ++ix ){
>> TextMessage* message = session->createTextMessage(
> text
>> );
>>
>> string messageID="messageID";
>> message->setCMSExpiration(10000000000);
>> message->setCMSMessageId(messageID);
>>
>> // Tell the producer to send the message
>> printf( "Sent message from thread %s\n",
>> threadIdStr.c_str() );
>> producer->send( message );
>>
>> delete message;
>> }
>>
>> }catch ( CMSException& e ) {
>> e.printStackTrace();
>> }
>> }
>>
>> private:
>>
>> void cleanup(){
>>
>> // Destroy resources.
>> try{
>> if( destination != NULL ) delete destination;
>> }catch ( CMSException& e ) {}
>> destination = NULL;
>>
>> try{
>> if( producer != NULL ) delete producer;
>> }catch ( CMSException& e ) {}
>> producer = NULL;
>>
>> // Close open resources.
>> try{
>> if( session != NULL ) session->close();
>> if( connection != NULL ) connection->close();
>> }catch ( CMSException& e ) {}
>>
>> try{
>> if( session != NULL ) delete session;
>> }catch ( CMSException& e ) {}
>> session = NULL;
>>
>> try{
>> if( connection != NULL ) delete connection;
>> }catch ( CMSException& e ) {}
>> connection = NULL;
>> }
>> };
>>
>> class HelloWorldConsumer : public ExceptionListener,
>> public MessageListener,
>> public Runnable {
>>
>> private:
>>
>> Connection* connection;
>> Session* session;
>> Topic* destination;
>> MessageConsumer* consumer;
>> long waitMillis;
>>
>> public:
>>
>> HelloWorldConsumer( long waitMillis ){
>> connection = NULL;
>> session = NULL;
>> destination = NULL;
>> consumer = NULL;
>> this->waitMillis = waitMillis;
>> }
>> virtual ~HelloWorldConsumer(){
>> cleanup();
>> }
>>
>> virtual void run() {
>>
>> try {
>>
>> string user,passwd,sID;
>> user="default";
>> passwd="";
>> sID="lsgID";
>> // Create a ConnectionFactory
>> ActiveMQConnectionFactory* connectionFactory =
>> new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID);
>>
>> // Create a Connection
>> connection =
>> connectionFactory->createConnection();//user,passwd,sID);
>> delete connectionFactory;
>> connection->start();
>>
>> connection->setExceptionListener(this);
>>
>> // Create a Session
>> session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> destination = session->createTopic( "mytopic" );
>> consumer = session->createDurableConsumer(
> destination ,
>> user ,
>> "",false);
>>
>> consumer->setMessageListener( this );
>>
>> Thread::sleep( waitMillis );
>>
>> } catch (CMSException& e) {
>> e.printStackTrace();
>> }
>> }
>>
>> virtual void onMessage( const Message* message ){
>>
>> try
>> {
>> const TextMessage* textMessage =
>> dynamic_cast< const TextMessage* >( message );
>> string text = textMessage->getText();
>> printf( "Received: %s\n", text.c_str() );
>> } catch (CMSException& e) {
>> e.printStackTrace();
>> }
>> }
>>
>> virtual void onException( const CMSException& ex ) {
>> printf("JMS Exception occured. Shutting down client.\n");
>> }
>>
>> private:
>>
>> void cleanup(){
>>
>> // Destroy resources.
>> try{
>> if( destination != NULL ) delete destination;
>> }catch (CMSException& e) {}
>> destination = NULL;
>>
>> try{
>> if( consumer != NULL ) delete consumer;
>> }catch (CMSException& e) {}
>> consumer = NULL;
>>
>> // Close open resources.
>> try{
>> if( session != NULL ) session->close();
>> if( connection != NULL ) connection->close();
>> }catch (CMSException& e) {}
>>
>> try{
>> if( session != NULL ) delete session;
>> }catch (CMSException& e) {}
>> session = NULL;
>>
>> try{
>> if( connection != NULL ) delete connection;
>> }catch (CMSException& e) {}
>> connection = NULL;
>> }
>> };
>> void Produce()
>> {
>> HelloWorldProducer producer( 2 );
>> Thread producerThread( &producer );
>> producerThread.start();
>> producerThread.join();
>> }
>> void Consumer()
>> {
>> HelloWorldConsumer consumer( 10000 );
>> Thread consumerThread( &consumer );
>> consumerThread.start();
>> consumerThread.join();
>> }
>> int main(int argc, char* argv[])
>> {
>> Produce();
>> Consumer();
>> }
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7373622
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
--
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7392444
Sent from the ActiveMQ - User mailing list archive at Nabble.com.