You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sgliu <sh...@sina.com> on 2006/11/16 09:41:32 UTC

above topic

follow program,why cann't receive data?
How should I modify code?

#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>

using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

class HelloWorldProducer : public Runnable {
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageProducer* producer;
	int numMessages;

public:
	
	HelloWorldProducer( int numMessages ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	producer = NULL;
    	this->numMessages = numMessages;
	}
	
	virtual ~HelloWorldProducer(){
		cleanup();
	}
	
    virtual void run() {
        try {
			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);

            connection =
connectionFactory->createConnection(user,passwd,sID);
            connection->start();

			string sss=connection->getClientId();
			cout << sss << endl;

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
			destination = session->createTopic( "mytopic" );

			producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::PERSISTANT );
            
			producer->setTimeToLive(100000000);
            string threadIdStr = Integer::toString( Thread::getId() );
            
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
            
            for( int ix=0; ix<numMessages; ++ix ){
	            TextMessage* message = session->createTextMessage( text );

				string messageID="messageID";
				message->setCMSExpiration(10000000000);
				message->setCMSMessageId(messageID);

    	        // Tell the producer to send the message
        	    printf( "Sent message from thread %s\n", threadIdStr.c_str() );
        		producer->send( message );
            	
            	delete message;
            }
			
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
    
private:

    void cleanup(){
    				
			// Destroy resources.
			try{                        
            	if( destination != NULL ) delete destination;
			}catch ( CMSException& e ) {}
			destination = NULL;
			
			try{
	            if( producer != NULL ) delete producer;
			}catch ( CMSException& e ) {}
			producer = NULL;
			
    		// Close open resources.
    		try{
    			if( session != NULL ) session->close();
    			if( connection != NULL ) connection->close();
			}catch ( CMSException& e ) {}

			try{
            	if( session != NULL ) delete session;
			}catch ( CMSException& e ) {}
			session = NULL;
			
            try{
            	if( connection != NULL ) delete connection;
			}catch ( CMSException& e ) {}
    		connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener,
                           public Runnable {
	
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageConsumer* consumer;
	long waitMillis;
		
public: 

	HelloWorldConsumer( long waitMillis ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	consumer = NULL;
    	this->waitMillis = waitMillis;
	}
    virtual ~HelloWorldConsumer(){    	
    	cleanup();
    }
    
    virtual void run() {
    	    	
        try {

			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection();//user,passwd,sID);
            delete connectionFactory;
            connection->start();
            
            connection->setExceptionListener(this);

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
			destination = session->createTopic( "mytopic" );
			consumer = session->createDurableConsumer( destination , user ,
"",false);
            
            consumer->setMessageListener( this );
            
            Thread::sleep( waitMillis );		
            
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    
    virtual void onMessage( const Message* message ){
    	
        try
        {
    	    const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }
    
private:

    void cleanup(){
    	
		// Destroy resources.
		try{                        
        	if( destination != NULL ) delete destination;
		}catch (CMSException& e) {}
		destination = NULL;
		
		try{
            if( consumer != NULL ) delete consumer;
		}catch (CMSException& e) {}
		consumer = NULL;
		
		// Close open resources.
		try{
			if( session != NULL ) session->close();
			if( connection != NULL ) connection->close();
		}catch (CMSException& e) {}
		
        try{
        	if( session != NULL ) delete session;
		}catch (CMSException& e) {}
		session = NULL;
		
		try{
        	if( connection != NULL ) delete connection;
		}catch (CMSException& e) {}
		connection = NULL;
    }
};
  void Produce() 
 { 
     HelloWorldProducer producer( 2 ); 
     Thread producerThread( &producer ); 
     producerThread.start(); 
     producerThread.join(); 
 } 
 void Consumer() 
 { 
      HelloWorldConsumer consumer( 10000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 int main(int argc, char* argv[]) 
 { 
      Produce(); 
      Consumer(); 
 }
-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7373622
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


RE: above topic

Posted by sgliu <sh...@sina.com>.
I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.



tabish121 wrote:
> 
>> 
>> How do I achieve durable subscription base on my program?
>> 
> 
> What is it you are trying to do?  What problem are you trying to solve?
> 
> 
>> 
>> 
>> sgliu wrote:
>> >
>> > Thanks for you advice.I wil read your tell me content.
>> >
>> >
>> >
>> >
>> > tabish121 wrote:
>> >>
>> >>>
>> >>>
>> >>> follow program,why cann't receive data?
>> >>> How should I modify code?
>> >>
>> >> Looks like you produce before the durable consumer has been
> connected
>> >> once.
>> >>
>> >> See this link:
>> >>
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> >>
>> >> A durable consumer must have connected and then disconnected before
>> >> messages are persisted in anticipation of the consumer
> reconnecting.
>> >> That's why its important to use the same subscription name when
>> >> reconnecting, so that the broker know that the consumer that was
>> >> subscribed as durable is now back and it can deliver the messages
> that
>> >> were stored in its absence.
>> >>
>> >>>
>> >>> #include <activemq/concurrent/Thread.h>
>> >>> #include <activemq/concurrent/Runnable.h>
>> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> >>> #include <activemq/util/Integer.h>
>> >>> #include <cms/Connection.h>
>> >>> #include <cms/Session.h>
>> >>> #include <cms/TextMessage.h>
>> >>> #include <cms/ExceptionListener.h>
>> >>> #include <cms/MessageListener.h>
>> >>> #include <stdlib.h>
>> >>>
>> >>> using namespace activemq::core;
>> >>> using namespace activemq::util;
>> >>> using namespace activemq::concurrent;
>> >>> using namespace cms;
>> >>> using namespace std;
>> >>>
>> >>> class HelloWorldProducer : public Runnable {
>> >>> private:
>> >>>
>> >>> 	Connection* connection;
>> >>> 	Session* session;
>> >>> 	Topic* destination;
>> >>> 	MessageProducer* producer;
>> >>> 	int numMessages;
>> >>>
>> >>> public:
>> >>>
>> >>> 	HelloWorldProducer( int numMessages ){
>> >>> 		connection = NULL;
>> >>>     	session = NULL;
>> >>>     	destination = NULL;
>> >>>     	producer = NULL;
>> >>>     	this->numMessages = numMessages;
>> >>> 	}
>> >>>
>> >>> 	virtual ~HelloWorldProducer(){
>> >>> 		cleanup();
>> >>> 	}
>> >>>
>> >>>     virtual void run() {
>> >>>         try {
>> >>> 			string user,passwd,sID;
>> >>> 			user="default";
>> >>> 			passwd="";
>> >>> 			sID="lsgID";
>> >>>             ActiveMQConnectionFactory* connectionFactory = new
>> >>>
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             connection =
>> >>> connectionFactory->createConnection(user,passwd,sID);
>> >>>             connection->start();
>> >>>
>> >>> 			string sss=connection->getClientId();
>> >>> 			cout << sss << endl;
>> >>>
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> 			destination = session->createTopic( "mytopic" );
>> >>>
>> >>> 			producer = session->createProducer( destination
>> >> );
>> >>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> >>>
>> >>> 			producer->setTimeToLive(100000000);
>> >>>             string threadIdStr = Integer::toString(
> Thread::getId() );
>> >>>
>> >>>             // Create a messages
>> >>>             string text = (string)"Hello world! from thread " +
>> >>> threadIdStr;
>> >>>
>> >>>             for( int ix=0; ix<numMessages; ++ix ){
>> >>> 	            TextMessage* message = session->createTextMessage(
>> >> text
>> >>> );
>> >>>
>> >>> 				string messageID="messageID";
>> >>> 				message->setCMSExpiration(10000000000);
>> >>> 				message->setCMSMessageId(messageID);
>> >>>
>> >>>     	        // Tell the producer to send the message
>> >>>         	    printf( "Sent message from thread %s\n",
>> >>> threadIdStr.c_str() );
>> >>>         		producer->send( message );
>> >>>
>> >>>             	delete message;
>> >>>             }
>> >>>
>> >>>         }catch ( CMSException& e ) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>> 			// Destroy resources.
>> >>> 			try{
>> >>>             	if( destination != NULL ) delete destination;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			destination = NULL;
>> >>>
>> >>> 			try{
>> >>> 	            if( producer != NULL ) delete producer;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			producer = NULL;
>> >>>
>> >>>     		// Close open resources.
>> >>>     		try{
>> >>>     			if( session != NULL ) session->close();
>> >>>     			if( connection != NULL )
> connection->close();
>> >>> 			}catch ( CMSException& e ) {}
>> >>>
>> >>> 			try{
>> >>>             	if( session != NULL ) delete session;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			session = NULL;
>> >>>
>> >>>             try{
>> >>>             	if( connection != NULL ) delete connection;
>> >>> 			}catch ( CMSException& e ) {}
>> >>>     		connection = NULL;
>> >>>     }
>> >>> };
>> >>>
>> >>> class HelloWorldConsumer : public ExceptionListener,
>> >>>                            public MessageListener,
>> >>>                            public Runnable {
>> >>>
>> >>> private:
>> >>>
>> >>> 	Connection* connection;
>> >>> 	Session* session;
>> >>> 	Topic* destination;
>> >>> 	MessageConsumer* consumer;
>> >>> 	long waitMillis;
>> >>>
>> >>> public:
>> >>>
>> >>> 	HelloWorldConsumer( long waitMillis ){
>> >>> 		connection = NULL;
>> >>>     	session = NULL;
>> >>>     	destination = NULL;
>> >>>     	consumer = NULL;
>> >>>     	this->waitMillis = waitMillis;
>> >>> 	}
>> >>>     virtual ~HelloWorldConsumer(){
>> >>>     	cleanup();
>> >>>     }
>> >>>
>> >>>     virtual void run() {
>> >>>
>> >>>         try {
>> >>>
>> >>> 			string user,passwd,sID;
>> >>> 			user="default";
>> >>> 			passwd="";
>> >>> 			sID="lsgID";
>> >>>             // Create a ConnectionFactory
>> >>>             ActiveMQConnectionFactory* connectionFactory =
>> >>>                 new ActiveMQConnectionFactory(
>> >>> "tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             // Create a Connection
>> >>>             connection =
>> >>> connectionFactory->createConnection();//user,passwd,sID);
>> >>>             delete connectionFactory;
>> >>>             connection->start();
>> >>>
>> >>>             connection->setExceptionListener(this);
>> >>>
>> >>>             // Create a Session
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> 			destination = session->createTopic( "mytopic" );
>> >>> 			consumer = session->createDurableConsumer(
>> >> destination ,
>> >>> user ,
>> >>> "",false);
>> >>>
>> >>>             consumer->setMessageListener( this );
>> >>>
>> >>>             Thread::sleep( waitMillis );
>> >>>
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onMessage( const Message* message ){
>> >>>
>> >>>         try
>> >>>         {
>> >>>     	    const TextMessage* textMessage =
>> >>>                 dynamic_cast< const TextMessage* >( message );
>> >>>             string text = textMessage->getText();
>> >>>             printf( "Received: %s\n", text.c_str() );
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onException( const CMSException& ex ) {
>> >>>         printf("JMS Exception occured.  Shutting down client.\n");
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>> 		// Destroy resources.
>> >>> 		try{
>> >>>         	if( destination != NULL ) delete destination;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		destination = NULL;
>> >>>
>> >>> 		try{
>> >>>             if( consumer != NULL ) delete consumer;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		consumer = NULL;
>> >>>
>> >>> 		// Close open resources.
>> >>> 		try{
>> >>> 			if( session != NULL ) session->close();
>> >>> 			if( connection != NULL ) connection->close();
>> >>> 		}catch (CMSException& e) {}
>> >>>
>> >>>         try{
>> >>>         	if( session != NULL ) delete session;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		session = NULL;
>> >>>
>> >>> 		try{
>> >>>         	if( connection != NULL ) delete connection;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		connection = NULL;
>> >>>     }
>> >>> };
>> >>>   void Produce()
>> >>>  {
>> >>>      HelloWorldProducer producer( 2 );
>> >>>      Thread producerThread( &producer );
>> >>>      producerThread.start();
>> >>>      producerThread.join();
>> >>>  }
>> >>>  void Consumer()
>> >>>  {
>> >>>       HelloWorldConsumer consumer( 10000 );
>> >>>       Thread consumerThread( &consumer );
>> >>>       consumerThread.start();
>> >>>       consumerThread.join();
>> >>>  }
>> >>>  int main(int argc, char* argv[])
>> >>>  {
>> >>>       Produce();
>> >>>       Consumer();
>> >>>  }
>> >>> --
>> >>> View this message in context: http://www.nabble.com/above-topic-
>> >>> tf2641566.html#a7373622
>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >>
>> >
>> >
>> 
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7393074
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7413840
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


RE: above topic

Posted by sgliu <sh...@sina.com>.
How do I achieve durable subscription base on my program?



sgliu wrote:
> 
> Thanks for you advice.I wil read your tell me content.
> 
> 
> 
> 
> tabish121 wrote:
>> 
>>> 
>>> 
>>> follow program,why cann't receive data?
>>> How should I modify code?
>> 
>> Looks like you produce before the durable consumer has been connected
>> once. 
>> 
>> See this link:
>> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> 
>> A durable consumer must have connected and then disconnected before
>> messages are persisted in anticipation of the consumer reconnecting.
>> That's why its important to use the same subscription name when
>> reconnecting, so that the broker know that the consumer that was
>> subscribed as durable is now back and it can deliver the messages that
>> were stored in its absence.  
>> 
>>> 
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>> 
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>> 
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>> 
>>> 	Connection* connection;
>>> 	Session* session;
>>> 	Topic* destination;
>>> 	MessageProducer* producer;
>>> 	int numMessages;
>>> 
>>> public:
>>> 
>>> 	HelloWorldProducer( int numMessages ){
>>> 		connection = NULL;
>>>     	session = NULL;
>>>     	destination = NULL;
>>>     	producer = NULL;
>>>     	this->numMessages = numMessages;
>>> 	}
>>> 
>>> 	virtual ~HelloWorldProducer(){
>>> 		cleanup();
>>> 	}
>>> 
>>>     virtual void run() {
>>>         try {
>>> 			string user,passwd,sID;
>>> 			user="default";
>>> 			passwd="";
>>> 			sID="lsgID";
>>>             ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>>             connection->start();
>>> 
>>> 			string sss=connection->getClientId();
>>> 			cout << sss << endl;
>>> 
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> 			destination = session->createTopic( "mytopic" );
>>> 
>>> 			producer = session->createProducer( destination
>> );
>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>> 
>>> 			producer->setTimeToLive(100000000);
>>>             string threadIdStr = Integer::toString( Thread::getId() );
>>> 
>>>             // Create a messages
>>>             string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>> 
>>>             for( int ix=0; ix<numMessages; ++ix ){
>>> 	            TextMessage* message = session->createTextMessage(
>> text
>>> );
>>> 
>>> 				string messageID="messageID";
>>> 				message->setCMSExpiration(10000000000);
>>> 				message->setCMSMessageId(messageID);
>>> 
>>>     	        // Tell the producer to send the message
>>>         	    printf( "Sent message from thread %s\n",
>>> threadIdStr.c_str() );
>>>         		producer->send( message );
>>> 
>>>             	delete message;
>>>             }
>>> 
>>>         }catch ( CMSException& e ) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>> 			// Destroy resources.
>>> 			try{
>>>             	if( destination != NULL ) delete destination;
>>> 			}catch ( CMSException& e ) {}
>>> 			destination = NULL;
>>> 
>>> 			try{
>>> 	            if( producer != NULL ) delete producer;
>>> 			}catch ( CMSException& e ) {}
>>> 			producer = NULL;
>>> 
>>>     		// Close open resources.
>>>     		try{
>>>     			if( session != NULL ) session->close();
>>>     			if( connection != NULL ) connection->close();
>>> 			}catch ( CMSException& e ) {}
>>> 
>>> 			try{
>>>             	if( session != NULL ) delete session;
>>> 			}catch ( CMSException& e ) {}
>>> 			session = NULL;
>>> 
>>>             try{
>>>             	if( connection != NULL ) delete connection;
>>> 			}catch ( CMSException& e ) {}
>>>     		connection = NULL;
>>>     }
>>> };
>>> 
>>> class HelloWorldConsumer : public ExceptionListener,
>>>                            public MessageListener,
>>>                            public Runnable {
>>> 
>>> private:
>>> 
>>> 	Connection* connection;
>>> 	Session* session;
>>> 	Topic* destination;
>>> 	MessageConsumer* consumer;
>>> 	long waitMillis;
>>> 
>>> public:
>>> 
>>> 	HelloWorldConsumer( long waitMillis ){
>>> 		connection = NULL;
>>>     	session = NULL;
>>>     	destination = NULL;
>>>     	consumer = NULL;
>>>     	this->waitMillis = waitMillis;
>>> 	}
>>>     virtual ~HelloWorldConsumer(){
>>>     	cleanup();
>>>     }
>>> 
>>>     virtual void run() {
>>> 
>>>         try {
>>> 
>>> 			string user,passwd,sID;
>>> 			user="default";
>>> 			passwd="";
>>> 			sID="lsgID";
>>>             // Create a ConnectionFactory
>>>             ActiveMQConnectionFactory* connectionFactory =
>>>                 new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             // Create a Connection
>>>             connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>>             delete connectionFactory;
>>>             connection->start();
>>> 
>>>             connection->setExceptionListener(this);
>>> 
>>>             // Create a Session
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> 			destination = session->createTopic( "mytopic" );
>>> 			consumer = session->createDurableConsumer(
>> destination ,
>>> user ,
>>> "",false);
>>> 
>>>             consumer->setMessageListener( this );
>>> 
>>>             Thread::sleep( waitMillis );
>>> 
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onMessage( const Message* message ){
>>> 
>>>         try
>>>         {
>>>     	    const TextMessage* textMessage =
>>>                 dynamic_cast< const TextMessage* >( message );
>>>             string text = textMessage->getText();
>>>             printf( "Received: %s\n", text.c_str() );
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onException( const CMSException& ex ) {
>>>         printf("JMS Exception occured.  Shutting down client.\n");
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>> 		// Destroy resources.
>>> 		try{
>>>         	if( destination != NULL ) delete destination;
>>> 		}catch (CMSException& e) {}
>>> 		destination = NULL;
>>> 
>>> 		try{
>>>             if( consumer != NULL ) delete consumer;
>>> 		}catch (CMSException& e) {}
>>> 		consumer = NULL;
>>> 
>>> 		// Close open resources.
>>> 		try{
>>> 			if( session != NULL ) session->close();
>>> 			if( connection != NULL ) connection->close();
>>> 		}catch (CMSException& e) {}
>>> 
>>>         try{
>>>         	if( session != NULL ) delete session;
>>> 		}catch (CMSException& e) {}
>>> 		session = NULL;
>>> 
>>> 		try{
>>>         	if( connection != NULL ) delete connection;
>>> 		}catch (CMSException& e) {}
>>> 		connection = NULL;
>>>     }
>>> };
>>>   void Produce()
>>>  {
>>>      HelloWorldProducer producer( 2 );
>>>      Thread producerThread( &producer );
>>>      producerThread.start();
>>>      producerThread.join();
>>>  }
>>>  void Consumer()
>>>  {
>>>       HelloWorldConsumer consumer( 10000 );
>>>       Thread consumerThread( &consumer );
>>>       consumerThread.start();
>>>       consumerThread.join();
>>>  }
>>>  int main(int argc, char* argv[])
>>>  {
>>>       Produce();
>>>       Consumer();
>>>  }
>>> --
>>> View this message in context: http://www.nabble.com/above-topic-
>>> tf2641566.html#a7373622
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7393074
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


RE: above topic

Posted by sgliu <sh...@sina.com>.
Thanks for you advice.I wil read your tell me content.




tabish121 wrote:
> 
>> 
>> 
>> follow program,why cann't receive data?
>> How should I modify code?
> 
> Looks like you produce before the durable consumer has been connected
> once. 
> 
> See this link:
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
> 
> A durable consumer must have connected and then disconnected before
> messages are persisted in anticipation of the consumer reconnecting.
> That's why its important to use the same subscription name when
> reconnecting, so that the broker know that the consumer that was
> subscribed as durable is now back and it can deliver the messages that
> were stored in its absence.  
> 
>> 
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Integer.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>> 
>> using namespace activemq::core;
>> using namespace activemq::util;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>> 
>> class HelloWorldProducer : public Runnable {
>> private:
>> 
>> 	Connection* connection;
>> 	Session* session;
>> 	Topic* destination;
>> 	MessageProducer* producer;
>> 	int numMessages;
>> 
>> public:
>> 
>> 	HelloWorldProducer( int numMessages ){
>> 		connection = NULL;
>>     	session = NULL;
>>     	destination = NULL;
>>     	producer = NULL;
>>     	this->numMessages = numMessages;
>> 	}
>> 
>> 	virtual ~HelloWorldProducer(){
>> 		cleanup();
>> 	}
>> 
>>     virtual void run() {
>>         try {
>> 			string user,passwd,sID;
>> 			user="default";
>> 			passwd="";
>> 			sID="lsgID";
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> 
>>             connection =
>> connectionFactory->createConnection(user,passwd,sID);
>>             connection->start();
>> 
>> 			string sss=connection->getClientId();
>> 			cout << sss << endl;
>> 
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> 			destination = session->createTopic( "mytopic" );
>> 
>> 			producer = session->createProducer( destination
> );
>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> 
>> 			producer->setTimeToLive(100000000);
>>             string threadIdStr = Integer::toString( Thread::getId() );
>> 
>>             // Create a messages
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr;
>> 
>>             for( int ix=0; ix<numMessages; ++ix ){
>> 	            TextMessage* message = session->createTextMessage(
> text
>> );
>> 
>> 				string messageID="messageID";
>> 				message->setCMSExpiration(10000000000);
>> 				message->setCMSMessageId(messageID);
>> 
>>     	        // Tell the producer to send the message
>>         	    printf( "Sent message from thread %s\n",
>> threadIdStr.c_str() );
>>         		producer->send( message );
>> 
>>             	delete message;
>>             }
>> 
>>         }catch ( CMSException& e ) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>> 			// Destroy resources.
>> 			try{
>>             	if( destination != NULL ) delete destination;
>> 			}catch ( CMSException& e ) {}
>> 			destination = NULL;
>> 
>> 			try{
>> 	            if( producer != NULL ) delete producer;
>> 			}catch ( CMSException& e ) {}
>> 			producer = NULL;
>> 
>>     		// Close open resources.
>>     		try{
>>     			if( session != NULL ) session->close();
>>     			if( connection != NULL ) connection->close();
>> 			}catch ( CMSException& e ) {}
>> 
>> 			try{
>>             	if( session != NULL ) delete session;
>> 			}catch ( CMSException& e ) {}
>> 			session = NULL;
>> 
>>             try{
>>             	if( connection != NULL ) delete connection;
>> 			}catch ( CMSException& e ) {}
>>     		connection = NULL;
>>     }
>> };
>> 
>> class HelloWorldConsumer : public ExceptionListener,
>>                            public MessageListener,
>>                            public Runnable {
>> 
>> private:
>> 
>> 	Connection* connection;
>> 	Session* session;
>> 	Topic* destination;
>> 	MessageConsumer* consumer;
>> 	long waitMillis;
>> 
>> public:
>> 
>> 	HelloWorldConsumer( long waitMillis ){
>> 		connection = NULL;
>>     	session = NULL;
>>     	destination = NULL;
>>     	consumer = NULL;
>>     	this->waitMillis = waitMillis;
>> 	}
>>     virtual ~HelloWorldConsumer(){
>>     	cleanup();
>>     }
>> 
>>     virtual void run() {
>> 
>>         try {
>> 
>> 			string user,passwd,sID;
>> 			user="default";
>> 			passwd="";
>> 			sID="lsgID";
>>             // Create a ConnectionFactory
>>             ActiveMQConnectionFactory* connectionFactory =
>>                 new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID);
>> 
>>             // Create a Connection
>>             connection =
>> connectionFactory->createConnection();//user,passwd,sID);
>>             delete connectionFactory;
>>             connection->start();
>> 
>>             connection->setExceptionListener(this);
>> 
>>             // Create a Session
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> 			destination = session->createTopic( "mytopic" );
>> 			consumer = session->createDurableConsumer(
> destination ,
>> user ,
>> "",false);
>> 
>>             consumer->setMessageListener( this );
>> 
>>             Thread::sleep( waitMillis );
>> 
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onMessage( const Message* message ){
>> 
>>         try
>>         {
>>     	    const TextMessage* textMessage =
>>                 dynamic_cast< const TextMessage* >( message );
>>             string text = textMessage->getText();
>>             printf( "Received: %s\n", text.c_str() );
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onException( const CMSException& ex ) {
>>         printf("JMS Exception occured.  Shutting down client.\n");
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>> 		// Destroy resources.
>> 		try{
>>         	if( destination != NULL ) delete destination;
>> 		}catch (CMSException& e) {}
>> 		destination = NULL;
>> 
>> 		try{
>>             if( consumer != NULL ) delete consumer;
>> 		}catch (CMSException& e) {}
>> 		consumer = NULL;
>> 
>> 		// Close open resources.
>> 		try{
>> 			if( session != NULL ) session->close();
>> 			if( connection != NULL ) connection->close();
>> 		}catch (CMSException& e) {}
>> 
>>         try{
>>         	if( session != NULL ) delete session;
>> 		}catch (CMSException& e) {}
>> 		session = NULL;
>> 
>> 		try{
>>         	if( connection != NULL ) delete connection;
>> 		}catch (CMSException& e) {}
>> 		connection = NULL;
>>     }
>> };
>>   void Produce()
>>  {
>>      HelloWorldProducer producer( 2 );
>>      Thread producerThread( &producer );
>>      producerThread.start();
>>      producerThread.join();
>>  }
>>  void Consumer()
>>  {
>>       HelloWorldConsumer consumer( 10000 );
>>       Thread consumerThread( &consumer );
>>       consumerThread.start();
>>       consumerThread.join();
>>  }
>>  int main(int argc, char* argv[])
>>  {
>>       Produce();
>>       Consumer();
>>  }
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7373622
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7392444
Sent from the ActiveMQ - User mailing list archive at Nabble.com.