You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sgliu <sh...@sina.com> on 2006/11/20 01:52:56 UTC

SOS

I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.

#include <activemq/concurrent/Thread.h> 
#include <activemq/concurrent/Runnable.h> 
#include <activemq/core/ActiveMQConnectionFactory.h> 
#include <activemq/util/Integer.h> 
#include <cms/Connection.h> 
#include <cms/Session.h> 
#include <cms/TextMessage.h> 
#include <cms/ExceptionListener.h> 
#include <cms/MessageListener.h> 
#include <stdlib.h> 

using namespace activemq::core; 
using namespace activemq::util; 
using namespace activemq::concurrent; 
using namespace cms; 
using namespace std; 

class HelloWorldProducer : public Runnable { 
private: 
        
        Connection* connection; 
        Session* session; 
        Topic* destination; 
        MessageProducer* producer; 
        int numMessages; 

public: 
        
        HelloWorldProducer( int numMessages ){ 
                connection = NULL; 
    session = NULL; 
    destination = NULL; 
    producer = NULL; 
    this->numMessages = numMessages; 
        } 
        
        virtual ~HelloWorldProducer(){ 
                cleanup(); 
        } 
        
    virtual void run() { 
        try { 
                        string user,passwd,sID; 
                        user="default"; 
                        passwd=""; 
                        sID="lsgID"; 
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 

            connection =
connectionFactory->createConnection(user,passwd,sID); 
            connection->start(); 

                        string sss=connection->getClientId(); 
                        cout << sss << endl; 

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
); 
                        destination = session->createTopic( "mytopic" ); 

                        producer = session->createProducer( destination ); 
            producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
            
                        producer->setTimeToLive(100000000); 
            string threadIdStr = Integer::toString( Thread::getId() ); 
            
            // Create a messages 
            string text = (string)"Hello world! from thread " + threadIdStr; 
            
            for( int ix=0; ix<numMessages; ++ix ){ 
                    TextMessage* message = session->createTextMessage( text
); 

                                string messageID="messageID"; 
                                message->setCMSExpiration(10000000000); 
                                message->setCMSMessageId(messageID); 

           // Tell the producer to send the message 
           printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
        producer->send( message ); 
            
            delete message; 
            } 
                        
        }catch ( CMSException& e ) { 
            e.printStackTrace(); 
        } 
    } 
    
private: 

    void cleanup(){ 
    
                        // Destroy resources. 
                        try{                         
            if( destination != NULL ) delete destination; 
                        }catch ( CMSException& e ) {} 
                        destination = NULL; 
                        
                        try{ 
                    if( producer != NULL ) delete producer; 
                        }catch ( CMSException& e ) {} 
                        producer = NULL; 
                        
    // Close open resources. 
    try{ 
    if( session != NULL ) session->close(); 
    if( connection != NULL ) connection->close(); 
                        }catch ( CMSException& e ) {} 

                        try{ 
            if( session != NULL ) delete session; 
                        }catch ( CMSException& e ) {} 
                        session = NULL; 
                        
            try{ 
            if( connection != NULL ) delete connection; 
                        }catch ( CMSException& e ) {} 
    connection = NULL; 
    } 
}; 

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener, 
                           public Runnable { 
        
private: 
        
        Connection* connection; 
        Session* session; 
        Topic* destination; 
        MessageConsumer* consumer; 
        long waitMillis; 
                
public: 

        HelloWorldConsumer( long waitMillis ){ 
                connection = NULL; 
    session = NULL; 
    destination = NULL; 
    consumer = NULL; 
    this->waitMillis = waitMillis; 
        } 
    virtual ~HelloWorldConsumer(){     
    cleanup(); 
    } 
    
    virtual void run() { 
        
        try { 

                        string user,passwd,sID; 
                        user="default"; 
                        passwd=""; 
                        sID="lsgID"; 
            // Create a ConnectionFactory 
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID); 

            // Create a Connection 
            connection =
connectionFactory->createConnection();//user,passwd,sID); 
            delete connectionFactory; 
            connection->start(); 
            
            connection->setExceptionListener(this); 

            // Create a Session 
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
); 
                        destination = session->createTopic( "mytopic" ); 
                        consumer = session->createDurableConsumer(
destination , user , "",false); 
            
            consumer->setMessageListener( this ); 
            
            Thread::sleep( waitMillis ); 
            
        } catch (CMSException& e) { 
            e.printStackTrace(); 
        } 
    } 
    
    virtual void onMessage( const Message* message ){ 
    
        try 
        { 
       const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message ); 
            string text = textMessage->getText(); 
            printf( "Received: %s\n", text.c_str() ); 
        } catch (CMSException& e) { 
            e.printStackTrace(); 
        } 
    } 

    virtual void onException( const CMSException& ex ) { 
        printf("JMS Exception occured.  Shutting down client.\n"); 
    } 
    
private: 

    void cleanup(){ 
    
                // Destroy resources. 
                try{                         
        if( destination != NULL ) delete destination; 
                }catch (CMSException& e) {} 
                destination = NULL; 
                
                try{ 
            if( consumer != NULL ) delete consumer; 
                }catch (CMSException& e) {} 
                consumer = NULL; 
                
                // Close open resources. 
                try{ 
                        if( session != NULL ) session->close(); 
                        if( connection != NULL ) connection->close(); 
                }catch (CMSException& e) {} 
                
        try{ 
        if( session != NULL ) delete session; 
                }catch (CMSException& e) {} 
                session = NULL; 
                
                try{ 
        if( connection != NULL ) delete connection; 
                }catch (CMSException& e) {} 
                connection = NULL; 
    } 
}; 
  void Produce() 
 { 
     HelloWorldProducer producer( 2 ); 
     Thread producerThread( &producer ); 
     producerThread.start(); 
     producerThread.join(); 
 } 
 void Consumer1() 
 { 
      HelloWorldConsumer consumer( 10000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 void Consumer2() 
 { 
      HelloWorldConsumer consumer( 10000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 int main(int argc, char* argv[]) 
 { 
      Produce(); 
      Consumer1(); 
      Consumer2(); 
 }
-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7435462
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: SOS

Posted by sgliu <sh...@sina.com>.
"retroactive consumers",
follow code,just I want.
queue = new ActiveMQQueue("TEST.QUEUE?consumer.retroactive=true");
consumer = session.createConsumer(queue);

Thanks a lot.



Adrian Co wrote:
> 
> I do not understand what you want. If you want to send first, then still 
> receive later using topics, you could try using retroactive consumers: 
> http://www.activemq.org/site/retroactive-consumer.html or create durable 
> subscribers for more reliable messaging,
> 
> sgliu wrote:
>> Send first,then receive.In topic.setCMSExpiration() function?
>> What can I do?
>>
>>
>>
>> Adrian Co wrote:
>>   
>>> Try subscribing the consumers first before sending messages.
>>>
>>> int main(int argc, char* argv[]) 
>>>  { 
>>>       Consumer1(); 
>>>       Consumer2(); 
>>>
>>>       Produce(); 
>>>
>>>  }
>>>
>>>
>>>
>>> sgliu wrote:
>>>     
>>>> Please help me.
>>>>
>>>>
>>>> sgliu wrote:
>>>>   
>>>>       
>>>>> I wish I producer  message A, and then I exit the producer program.
>>>>> Then
>>>>> I
>>>>> start two consumer program(one is C1,the other is C2) at same time.C1 
>>>>> can
>>>>> receive A , C2 can receive A.
>>>>>
>>>>> #include <activemq/concurrent/Thread.h> 
>>>>> #include <activemq/concurrent/Runnable.h> 
>>>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>>>> #include <activemq/util/Integer.h> 
>>>>> #include <cms/Connection.h> 
>>>>> #include <cms/Session.h> 
>>>>> #include <cms/TextMessage.h> 
>>>>> #include <cms/ExceptionListener.h> 
>>>>> #include <cms/MessageListener.h> 
>>>>> #include <stdlib.h> 
>>>>>
>>>>> using namespace activemq::core; 
>>>>> using namespace activemq::util; 
>>>>> using namespace activemq::concurrent; 
>>>>> using namespace cms; 
>>>>> using namespace std; 
>>>>>
>>>>> class HelloWorldProducer : public Runnable { 
>>>>> private: 
>>>>>         
>>>>>         Connection* connection; 
>>>>>         Session* session; 
>>>>>         Topic* destination; 
>>>>>         MessageProducer* producer; 
>>>>>         int numMessages; 
>>>>>
>>>>> public: 
>>>>>         
>>>>>         HelloWorldProducer( int numMessages ){ 
>>>>>                 connection = NULL; 
>>>>>     session = NULL; 
>>>>>     destination = NULL; 
>>>>>     producer = NULL; 
>>>>>     this->numMessages = numMessages; 
>>>>>         } 
>>>>>         
>>>>>         virtual ~HelloWorldProducer(){ 
>>>>>                 cleanup(); 
>>>>>         } 
>>>>>         
>>>>>     virtual void run() { 
>>>>>         try { 
>>>>>                         string user,passwd,sID; 
>>>>>                         user="default"; 
>>>>>                         passwd=""; 
>>>>>                         sID="lsgID"; 
>>>>>             ActiveMQConnectionFactory* connectionFactory = new
>>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>>>
>>>>>             connection =
>>>>> connectionFactory->createConnection(user,passwd,sID); 
>>>>>             connection->start(); 
>>>>>
>>>>>                         string sss=connection->getClientId(); 
>>>>>                         cout << sss << endl; 
>>>>>
>>>>>             session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> ); 
>>>>>                         destination = session->createTopic( "mytopic"
>>>>> ); 
>>>>>
>>>>>                         producer = session->createProducer(
>>>>> destination
>>>>> ); 
>>>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>>>>             
>>>>>                         producer->setTimeToLive(100000000); 
>>>>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>>>>             
>>>>>             // Create a messages 
>>>>>             string text = (string)"Hello world! from thread " +
>>>>> threadIdStr; 
>>>>>             
>>>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>>>                     TextMessage* message = session->createTextMessage(
>>>>> text ); 
>>>>>
>>>>>                                 string messageID="messageID"; 
>>>>>                                
>>>>> message->setCMSExpiration(10000000000); 
>>>>>                                 message->setCMSMessageId(messageID); 
>>>>>
>>>>>            // Tell the producer to send the message 
>>>>>            printf( "Sent message from thread %s\n",
>>>>> threadIdStr.c_str()
>>>>> ); 
>>>>>         producer->send( message ); 
>>>>>             
>>>>>             delete message; 
>>>>>             } 
>>>>>                         
>>>>>         }catch ( CMSException& e ) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>     
>>>>> private: 
>>>>>
>>>>>     void cleanup(){ 
>>>>>     
>>>>>                         // Destroy resources. 
>>>>>                         try{                         
>>>>>             if( destination != NULL ) delete destination; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         destination = NULL; 
>>>>>                         
>>>>>                         try{ 
>>>>>                     if( producer != NULL ) delete producer; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         producer = NULL; 
>>>>>                         
>>>>>     // Close open resources. 
>>>>>     try{ 
>>>>>     if( session != NULL ) session->close(); 
>>>>>     if( connection != NULL ) connection->close(); 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>
>>>>>                         try{ 
>>>>>             if( session != NULL ) delete session; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         session = NULL; 
>>>>>                         
>>>>>             try{ 
>>>>>             if( connection != NULL ) delete connection; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>     connection = NULL; 
>>>>>     } 
>>>>> }; 
>>>>>
>>>>> class HelloWorldConsumer : public ExceptionListener, 
>>>>>                            public MessageListener, 
>>>>>                            public Runnable { 
>>>>>         
>>>>> private: 
>>>>>         
>>>>>         Connection* connection; 
>>>>>         Session* session; 
>>>>>         Topic* destination; 
>>>>>         MessageConsumer* consumer; 
>>>>>         long waitMillis; 
>>>>>                 
>>>>> public: 
>>>>>
>>>>>         HelloWorldConsumer( long waitMillis ){ 
>>>>>                 connection = NULL; 
>>>>>     session = NULL; 
>>>>>     destination = NULL; 
>>>>>     consumer = NULL; 
>>>>>     this->waitMillis = waitMillis; 
>>>>>         } 
>>>>>     virtual ~HelloWorldConsumer(){     
>>>>>     cleanup(); 
>>>>>     } 
>>>>>     
>>>>>     virtual void run() { 
>>>>>         
>>>>>         try { 
>>>>>
>>>>>                         string user,passwd,sID; 
>>>>>                         user="default"; 
>>>>>                         passwd=""; 
>>>>>                         sID="lsgID"; 
>>>>>             // Create a ConnectionFactory 
>>>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>>>                 new ActiveMQConnectionFactory(
>>>>> "tcp://localhost:61613",user,passwd,sID); 
>>>>>
>>>>>             // Create a Connection 
>>>>>             connection =
>>>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>>>             delete connectionFactory; 
>>>>>             connection->start(); 
>>>>>             
>>>>>             connection->setExceptionListener(this); 
>>>>>
>>>>>             // Create a Session 
>>>>>             session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> ); 
>>>>>                         destination = session->createTopic( "mytopic"
>>>>> ); 
>>>>>                         consumer = session->createDurableConsumer(
>>>>> destination , user , "",false); 
>>>>>             
>>>>>             consumer->setMessageListener( this ); 
>>>>>             
>>>>>             Thread::sleep( waitMillis ); 
>>>>>             
>>>>>         } catch (CMSException& e) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>     
>>>>>     virtual void onMessage( const Message* message ){ 
>>>>>     
>>>>>         try 
>>>>>         { 
>>>>>        const TextMessage* textMessage = 
>>>>>                 dynamic_cast< const TextMessage* >( message ); 
>>>>>             string text = textMessage->getText(); 
>>>>>             printf( "Received: %s\n", text.c_str() ); 
>>>>>         } catch (CMSException& e) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>
>>>>>     virtual void onException( const CMSException& ex ) { 
>>>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>>>     } 
>>>>>     
>>>>> private: 
>>>>>
>>>>>     void cleanup(){ 
>>>>>     
>>>>>                 // Destroy resources. 
>>>>>                 try{                         
>>>>>         if( destination != NULL ) delete destination; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 destination = NULL; 
>>>>>                 
>>>>>                 try{ 
>>>>>             if( consumer != NULL ) delete consumer; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 consumer = NULL; 
>>>>>                 
>>>>>                 // Close open resources. 
>>>>>                 try{ 
>>>>>                         if( session != NULL ) session->close(); 
>>>>>                         if( connection != NULL ) connection->close(); 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 
>>>>>         try{ 
>>>>>         if( session != NULL ) delete session; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 session = NULL; 
>>>>>                 
>>>>>                 try{ 
>>>>>         if( connection != NULL ) delete connection; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 connection = NULL; 
>>>>>     } 
>>>>> }; 
>>>>>   void Produce() 
>>>>>  { 
>>>>>      HelloWorldProducer producer( 2 ); 
>>>>>      Thread producerThread( &producer ); 
>>>>>      producerThread.start(); 
>>>>>      producerThread.join(); 
>>>>>  } 
>>>>>  void Consumer1() 
>>>>>  { 
>>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>>       Thread consumerThread( &consumer ); 
>>>>>       consumerThread.start(); 
>>>>>       consumerThread.join(); 
>>>>>  } 
>>>>>  void Consumer2() 
>>>>>  { 
>>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>>       Thread consumerThread( &consumer ); 
>>>>>       consumerThread.start(); 
>>>>>       consumerThread.join(); 
>>>>>  } 
>>>>>  int main(int argc, char* argv[]) 
>>>>>  { 
>>>>>       Produce(); 
>>>>>       Consumer1(); 
>>>>>       Consumer2(); 
>>>>>  }
>>>>>
>>>>>     
>>>>>         
>>>>   
>>>>       
>>>
>>>     
>>
>>   
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7535583
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: SOS

Posted by Adrian Co <ac...@exist.com>.
I do not understand what you want. If you want to send first, then still 
receive later using topics, you could try using retroactive consumers: 
http://www.activemq.org/site/retroactive-consumer.html or create durable 
subscribers for more reliable messaging,

sgliu wrote:
> Send first,then receive.In topic.setCMSExpiration() function?
> What can I do?
>
>
>
> Adrian Co wrote:
>   
>> Try subscribing the consumers first before sending messages.
>>
>> int main(int argc, char* argv[]) 
>>  { 
>>       Consumer1(); 
>>       Consumer2(); 
>>
>>       Produce(); 
>>
>>  }
>>
>>
>>
>> sgliu wrote:
>>     
>>> Please help me.
>>>
>>>
>>> sgliu wrote:
>>>   
>>>       
>>>> I wish I producer  message A, and then I exit the producer program. Then
>>>> I
>>>> start two consumer program(one is C1,the other is C2) at same time.C1 
>>>> can
>>>> receive A , C2 can receive A.
>>>>
>>>> #include <activemq/concurrent/Thread.h> 
>>>> #include <activemq/concurrent/Runnable.h> 
>>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>>> #include <activemq/util/Integer.h> 
>>>> #include <cms/Connection.h> 
>>>> #include <cms/Session.h> 
>>>> #include <cms/TextMessage.h> 
>>>> #include <cms/ExceptionListener.h> 
>>>> #include <cms/MessageListener.h> 
>>>> #include <stdlib.h> 
>>>>
>>>> using namespace activemq::core; 
>>>> using namespace activemq::util; 
>>>> using namespace activemq::concurrent; 
>>>> using namespace cms; 
>>>> using namespace std; 
>>>>
>>>> class HelloWorldProducer : public Runnable { 
>>>> private: 
>>>>         
>>>>         Connection* connection; 
>>>>         Session* session; 
>>>>         Topic* destination; 
>>>>         MessageProducer* producer; 
>>>>         int numMessages; 
>>>>
>>>> public: 
>>>>         
>>>>         HelloWorldProducer( int numMessages ){ 
>>>>                 connection = NULL; 
>>>>     session = NULL; 
>>>>     destination = NULL; 
>>>>     producer = NULL; 
>>>>     this->numMessages = numMessages; 
>>>>         } 
>>>>         
>>>>         virtual ~HelloWorldProducer(){ 
>>>>                 cleanup(); 
>>>>         } 
>>>>         
>>>>     virtual void run() { 
>>>>         try { 
>>>>                         string user,passwd,sID; 
>>>>                         user="default"; 
>>>>                         passwd=""; 
>>>>                         sID="lsgID"; 
>>>>             ActiveMQConnectionFactory* connectionFactory = new
>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>>
>>>>             connection =
>>>> connectionFactory->createConnection(user,passwd,sID); 
>>>>             connection->start(); 
>>>>
>>>>                         string sss=connection->getClientId(); 
>>>>                         cout << sss << endl; 
>>>>
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> ); 
>>>>                         destination = session->createTopic( "mytopic" ); 
>>>>
>>>>                         producer = session->createProducer( destination
>>>> ); 
>>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>>>             
>>>>                         producer->setTimeToLive(100000000); 
>>>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>>>             
>>>>             // Create a messages 
>>>>             string text = (string)"Hello world! from thread " +
>>>> threadIdStr; 
>>>>             
>>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>>                     TextMessage* message = session->createTextMessage(
>>>> text ); 
>>>>
>>>>                                 string messageID="messageID"; 
>>>>                                 message->setCMSExpiration(10000000000); 
>>>>                                 message->setCMSMessageId(messageID); 
>>>>
>>>>            // Tell the producer to send the message 
>>>>            printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>>> ); 
>>>>         producer->send( message ); 
>>>>             
>>>>             delete message; 
>>>>             } 
>>>>                         
>>>>         }catch ( CMSException& e ) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>     
>>>> private: 
>>>>
>>>>     void cleanup(){ 
>>>>     
>>>>                         // Destroy resources. 
>>>>                         try{                         
>>>>             if( destination != NULL ) delete destination; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         destination = NULL; 
>>>>                         
>>>>                         try{ 
>>>>                     if( producer != NULL ) delete producer; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         producer = NULL; 
>>>>                         
>>>>     // Close open resources. 
>>>>     try{ 
>>>>     if( session != NULL ) session->close(); 
>>>>     if( connection != NULL ) connection->close(); 
>>>>                         }catch ( CMSException& e ) {} 
>>>>
>>>>                         try{ 
>>>>             if( session != NULL ) delete session; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         session = NULL; 
>>>>                         
>>>>             try{ 
>>>>             if( connection != NULL ) delete connection; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>     connection = NULL; 
>>>>     } 
>>>> }; 
>>>>
>>>> class HelloWorldConsumer : public ExceptionListener, 
>>>>                            public MessageListener, 
>>>>                            public Runnable { 
>>>>         
>>>> private: 
>>>>         
>>>>         Connection* connection; 
>>>>         Session* session; 
>>>>         Topic* destination; 
>>>>         MessageConsumer* consumer; 
>>>>         long waitMillis; 
>>>>                 
>>>> public: 
>>>>
>>>>         HelloWorldConsumer( long waitMillis ){ 
>>>>                 connection = NULL; 
>>>>     session = NULL; 
>>>>     destination = NULL; 
>>>>     consumer = NULL; 
>>>>     this->waitMillis = waitMillis; 
>>>>         } 
>>>>     virtual ~HelloWorldConsumer(){     
>>>>     cleanup(); 
>>>>     } 
>>>>     
>>>>     virtual void run() { 
>>>>         
>>>>         try { 
>>>>
>>>>                         string user,passwd,sID; 
>>>>                         user="default"; 
>>>>                         passwd=""; 
>>>>                         sID="lsgID"; 
>>>>             // Create a ConnectionFactory 
>>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>>                 new ActiveMQConnectionFactory(
>>>> "tcp://localhost:61613",user,passwd,sID); 
>>>>
>>>>             // Create a Connection 
>>>>             connection =
>>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>>             delete connectionFactory; 
>>>>             connection->start(); 
>>>>             
>>>>             connection->setExceptionListener(this); 
>>>>
>>>>             // Create a Session 
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> ); 
>>>>                         destination = session->createTopic( "mytopic" ); 
>>>>                         consumer = session->createDurableConsumer(
>>>> destination , user , "",false); 
>>>>             
>>>>             consumer->setMessageListener( this ); 
>>>>             
>>>>             Thread::sleep( waitMillis ); 
>>>>             
>>>>         } catch (CMSException& e) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>     
>>>>     virtual void onMessage( const Message* message ){ 
>>>>     
>>>>         try 
>>>>         { 
>>>>        const TextMessage* textMessage = 
>>>>                 dynamic_cast< const TextMessage* >( message ); 
>>>>             string text = textMessage->getText(); 
>>>>             printf( "Received: %s\n", text.c_str() ); 
>>>>         } catch (CMSException& e) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>
>>>>     virtual void onException( const CMSException& ex ) { 
>>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>>     } 
>>>>     
>>>> private: 
>>>>
>>>>     void cleanup(){ 
>>>>     
>>>>                 // Destroy resources. 
>>>>                 try{                         
>>>>         if( destination != NULL ) delete destination; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 destination = NULL; 
>>>>                 
>>>>                 try{ 
>>>>             if( consumer != NULL ) delete consumer; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 consumer = NULL; 
>>>>                 
>>>>                 // Close open resources. 
>>>>                 try{ 
>>>>                         if( session != NULL ) session->close(); 
>>>>                         if( connection != NULL ) connection->close(); 
>>>>                 }catch (CMSException& e) {} 
>>>>                 
>>>>         try{ 
>>>>         if( session != NULL ) delete session; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 session = NULL; 
>>>>                 
>>>>                 try{ 
>>>>         if( connection != NULL ) delete connection; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 connection = NULL; 
>>>>     } 
>>>> }; 
>>>>   void Produce() 
>>>>  { 
>>>>      HelloWorldProducer producer( 2 ); 
>>>>      Thread producerThread( &producer ); 
>>>>      producerThread.start(); 
>>>>      producerThread.join(); 
>>>>  } 
>>>>  void Consumer1() 
>>>>  { 
>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>       Thread consumerThread( &consumer ); 
>>>>       consumerThread.start(); 
>>>>       consumerThread.join(); 
>>>>  } 
>>>>  void Consumer2() 
>>>>  { 
>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>       Thread consumerThread( &consumer ); 
>>>>       consumerThread.start(); 
>>>>       consumerThread.join(); 
>>>>  } 
>>>>  int main(int argc, char* argv[]) 
>>>>  { 
>>>>       Produce(); 
>>>>       Consumer1(); 
>>>>       Consumer2(); 
>>>>  }
>>>>
>>>>     
>>>>         
>>>   
>>>       
>>
>>     
>
>   


Re: SOS

Posted by sgliu <sh...@sina.com>.
Send first,then receive.In topic.setCMSExpiration() function?
What can I do?



Adrian Co wrote:
> 
> Try subscribing the consumers first before sending messages.
> 
> int main(int argc, char* argv[]) 
>  { 
>       Consumer1(); 
>       Consumer2(); 
> 
>       Produce(); 
> 
>  }
> 
> 
> 
> sgliu wrote:
>> Please help me.
>>
>>
>> sgliu wrote:
>>   
>>> I wish I producer  message A, and then I exit the producer program. Then
>>> I
>>> start two consumer program(one is C1,the other is C2) at same time.C1 
>>> can
>>> receive A , C2 can receive A.
>>>
>>> #include <activemq/concurrent/Thread.h> 
>>> #include <activemq/concurrent/Runnable.h> 
>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>> #include <activemq/util/Integer.h> 
>>> #include <cms/Connection.h> 
>>> #include <cms/Session.h> 
>>> #include <cms/TextMessage.h> 
>>> #include <cms/ExceptionListener.h> 
>>> #include <cms/MessageListener.h> 
>>> #include <stdlib.h> 
>>>
>>> using namespace activemq::core; 
>>> using namespace activemq::util; 
>>> using namespace activemq::concurrent; 
>>> using namespace cms; 
>>> using namespace std; 
>>>
>>> class HelloWorldProducer : public Runnable { 
>>> private: 
>>>         
>>>         Connection* connection; 
>>>         Session* session; 
>>>         Topic* destination; 
>>>         MessageProducer* producer; 
>>>         int numMessages; 
>>>
>>> public: 
>>>         
>>>         HelloWorldProducer( int numMessages ){ 
>>>                 connection = NULL; 
>>>     session = NULL; 
>>>     destination = NULL; 
>>>     producer = NULL; 
>>>     this->numMessages = numMessages; 
>>>         } 
>>>         
>>>         virtual ~HelloWorldProducer(){ 
>>>                 cleanup(); 
>>>         } 
>>>         
>>>     virtual void run() { 
>>>         try { 
>>>                         string user,passwd,sID; 
>>>                         user="default"; 
>>>                         passwd=""; 
>>>                         sID="lsgID"; 
>>>             ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>
>>>             connection =
>>> connectionFactory->createConnection(user,passwd,sID); 
>>>             connection->start(); 
>>>
>>>                         string sss=connection->getClientId(); 
>>>                         cout << sss << endl; 
>>>
>>>             session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> ); 
>>>                         destination = session->createTopic( "mytopic" ); 
>>>
>>>                         producer = session->createProducer( destination
>>> ); 
>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>>             
>>>                         producer->setTimeToLive(100000000); 
>>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>>             
>>>             // Create a messages 
>>>             string text = (string)"Hello world! from thread " +
>>> threadIdStr; 
>>>             
>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>                     TextMessage* message = session->createTextMessage(
>>> text ); 
>>>
>>>                                 string messageID="messageID"; 
>>>                                 message->setCMSExpiration(10000000000); 
>>>                                 message->setCMSMessageId(messageID); 
>>>
>>>            // Tell the producer to send the message 
>>>            printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>> ); 
>>>         producer->send( message ); 
>>>             
>>>             delete message; 
>>>             } 
>>>                         
>>>         }catch ( CMSException& e ) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>     
>>> private: 
>>>
>>>     void cleanup(){ 
>>>     
>>>                         // Destroy resources. 
>>>                         try{                         
>>>             if( destination != NULL ) delete destination; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         destination = NULL; 
>>>                         
>>>                         try{ 
>>>                     if( producer != NULL ) delete producer; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         producer = NULL; 
>>>                         
>>>     // Close open resources. 
>>>     try{ 
>>>     if( session != NULL ) session->close(); 
>>>     if( connection != NULL ) connection->close(); 
>>>                         }catch ( CMSException& e ) {} 
>>>
>>>                         try{ 
>>>             if( session != NULL ) delete session; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         session = NULL; 
>>>                         
>>>             try{ 
>>>             if( connection != NULL ) delete connection; 
>>>                         }catch ( CMSException& e ) {} 
>>>     connection = NULL; 
>>>     } 
>>> }; 
>>>
>>> class HelloWorldConsumer : public ExceptionListener, 
>>>                            public MessageListener, 
>>>                            public Runnable { 
>>>         
>>> private: 
>>>         
>>>         Connection* connection; 
>>>         Session* session; 
>>>         Topic* destination; 
>>>         MessageConsumer* consumer; 
>>>         long waitMillis; 
>>>                 
>>> public: 
>>>
>>>         HelloWorldConsumer( long waitMillis ){ 
>>>                 connection = NULL; 
>>>     session = NULL; 
>>>     destination = NULL; 
>>>     consumer = NULL; 
>>>     this->waitMillis = waitMillis; 
>>>         } 
>>>     virtual ~HelloWorldConsumer(){     
>>>     cleanup(); 
>>>     } 
>>>     
>>>     virtual void run() { 
>>>         
>>>         try { 
>>>
>>>                         string user,passwd,sID; 
>>>                         user="default"; 
>>>                         passwd=""; 
>>>                         sID="lsgID"; 
>>>             // Create a ConnectionFactory 
>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>                 new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID); 
>>>
>>>             // Create a Connection 
>>>             connection =
>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>             delete connectionFactory; 
>>>             connection->start(); 
>>>             
>>>             connection->setExceptionListener(this); 
>>>
>>>             // Create a Session 
>>>             session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> ); 
>>>                         destination = session->createTopic( "mytopic" ); 
>>>                         consumer = session->createDurableConsumer(
>>> destination , user , "",false); 
>>>             
>>>             consumer->setMessageListener( this ); 
>>>             
>>>             Thread::sleep( waitMillis ); 
>>>             
>>>         } catch (CMSException& e) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>     
>>>     virtual void onMessage( const Message* message ){ 
>>>     
>>>         try 
>>>         { 
>>>        const TextMessage* textMessage = 
>>>                 dynamic_cast< const TextMessage* >( message ); 
>>>             string text = textMessage->getText(); 
>>>             printf( "Received: %s\n", text.c_str() ); 
>>>         } catch (CMSException& e) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>
>>>     virtual void onException( const CMSException& ex ) { 
>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>     } 
>>>     
>>> private: 
>>>
>>>     void cleanup(){ 
>>>     
>>>                 // Destroy resources. 
>>>                 try{                         
>>>         if( destination != NULL ) delete destination; 
>>>                 }catch (CMSException& e) {} 
>>>                 destination = NULL; 
>>>                 
>>>                 try{ 
>>>             if( consumer != NULL ) delete consumer; 
>>>                 }catch (CMSException& e) {} 
>>>                 consumer = NULL; 
>>>                 
>>>                 // Close open resources. 
>>>                 try{ 
>>>                         if( session != NULL ) session->close(); 
>>>                         if( connection != NULL ) connection->close(); 
>>>                 }catch (CMSException& e) {} 
>>>                 
>>>         try{ 
>>>         if( session != NULL ) delete session; 
>>>                 }catch (CMSException& e) {} 
>>>                 session = NULL; 
>>>                 
>>>                 try{ 
>>>         if( connection != NULL ) delete connection; 
>>>                 }catch (CMSException& e) {} 
>>>                 connection = NULL; 
>>>     } 
>>> }; 
>>>   void Produce() 
>>>  { 
>>>      HelloWorldProducer producer( 2 ); 
>>>      Thread producerThread( &producer ); 
>>>      producerThread.start(); 
>>>      producerThread.join(); 
>>>  } 
>>>  void Consumer1() 
>>>  { 
>>>       HelloWorldConsumer consumer( 10000 ); 
>>>       Thread consumerThread( &consumer ); 
>>>       consumerThread.start(); 
>>>       consumerThread.join(); 
>>>  } 
>>>  void Consumer2() 
>>>  { 
>>>       HelloWorldConsumer consumer( 10000 ); 
>>>       Thread consumerThread( &consumer ); 
>>>       consumerThread.start(); 
>>>       consumerThread.join(); 
>>>  } 
>>>  int main(int argc, char* argv[]) 
>>>  { 
>>>       Produce(); 
>>>       Consumer1(); 
>>>       Consumer2(); 
>>>  }
>>>
>>>     
>>
>>   
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7500819
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: SOS

Posted by Adrian Co <ac...@exist.com>.
Try subscribing the consumers first before sending messages.

int main(int argc, char* argv[]) 
 { 
      Consumer1(); 
      Consumer2(); 

      Produce(); 

 }



sgliu wrote:
> Please help me.
>
>
> sgliu wrote:
>   
>> I wish I producer  message A, and then I exit the producer program. Then I
>> start two consumer program(one is C1,the other is C2) at same time.C1  can
>> receive A , C2 can receive A.
>>
>> #include <activemq/concurrent/Thread.h> 
>> #include <activemq/concurrent/Runnable.h> 
>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>> #include <activemq/util/Integer.h> 
>> #include <cms/Connection.h> 
>> #include <cms/Session.h> 
>> #include <cms/TextMessage.h> 
>> #include <cms/ExceptionListener.h> 
>> #include <cms/MessageListener.h> 
>> #include <stdlib.h> 
>>
>> using namespace activemq::core; 
>> using namespace activemq::util; 
>> using namespace activemq::concurrent; 
>> using namespace cms; 
>> using namespace std; 
>>
>> class HelloWorldProducer : public Runnable { 
>> private: 
>>         
>>         Connection* connection; 
>>         Session* session; 
>>         Topic* destination; 
>>         MessageProducer* producer; 
>>         int numMessages; 
>>
>> public: 
>>         
>>         HelloWorldProducer( int numMessages ){ 
>>                 connection = NULL; 
>>     session = NULL; 
>>     destination = NULL; 
>>     producer = NULL; 
>>     this->numMessages = numMessages; 
>>         } 
>>         
>>         virtual ~HelloWorldProducer(){ 
>>                 cleanup(); 
>>         } 
>>         
>>     virtual void run() { 
>>         try { 
>>                         string user,passwd,sID; 
>>                         user="default"; 
>>                         passwd=""; 
>>                         sID="lsgID"; 
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>
>>             connection =
>> connectionFactory->createConnection(user,passwd,sID); 
>>             connection->start(); 
>>
>>                         string sss=connection->getClientId(); 
>>                         cout << sss << endl; 
>>
>>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> ); 
>>                         destination = session->createTopic( "mytopic" ); 
>>
>>                         producer = session->createProducer( destination ); 
>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>             
>>                         producer->setTimeToLive(100000000); 
>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>             
>>             // Create a messages 
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr; 
>>             
>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>                     TextMessage* message = session->createTextMessage(
>> text ); 
>>
>>                                 string messageID="messageID"; 
>>                                 message->setCMSExpiration(10000000000); 
>>                                 message->setCMSMessageId(messageID); 
>>
>>            // Tell the producer to send the message 
>>            printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
>>         producer->send( message ); 
>>             
>>             delete message; 
>>             } 
>>                         
>>         }catch ( CMSException& e ) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>     
>> private: 
>>
>>     void cleanup(){ 
>>     
>>                         // Destroy resources. 
>>                         try{                         
>>             if( destination != NULL ) delete destination; 
>>                         }catch ( CMSException& e ) {} 
>>                         destination = NULL; 
>>                         
>>                         try{ 
>>                     if( producer != NULL ) delete producer; 
>>                         }catch ( CMSException& e ) {} 
>>                         producer = NULL; 
>>                         
>>     // Close open resources. 
>>     try{ 
>>     if( session != NULL ) session->close(); 
>>     if( connection != NULL ) connection->close(); 
>>                         }catch ( CMSException& e ) {} 
>>
>>                         try{ 
>>             if( session != NULL ) delete session; 
>>                         }catch ( CMSException& e ) {} 
>>                         session = NULL; 
>>                         
>>             try{ 
>>             if( connection != NULL ) delete connection; 
>>                         }catch ( CMSException& e ) {} 
>>     connection = NULL; 
>>     } 
>> }; 
>>
>> class HelloWorldConsumer : public ExceptionListener, 
>>                            public MessageListener, 
>>                            public Runnable { 
>>         
>> private: 
>>         
>>         Connection* connection; 
>>         Session* session; 
>>         Topic* destination; 
>>         MessageConsumer* consumer; 
>>         long waitMillis; 
>>                 
>> public: 
>>
>>         HelloWorldConsumer( long waitMillis ){ 
>>                 connection = NULL; 
>>     session = NULL; 
>>     destination = NULL; 
>>     consumer = NULL; 
>>     this->waitMillis = waitMillis; 
>>         } 
>>     virtual ~HelloWorldConsumer(){     
>>     cleanup(); 
>>     } 
>>     
>>     virtual void run() { 
>>         
>>         try { 
>>
>>                         string user,passwd,sID; 
>>                         user="default"; 
>>                         passwd=""; 
>>                         sID="lsgID"; 
>>             // Create a ConnectionFactory 
>>             ActiveMQConnectionFactory* connectionFactory = 
>>                 new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID); 
>>
>>             // Create a Connection 
>>             connection =
>> connectionFactory->createConnection();//user,passwd,sID); 
>>             delete connectionFactory; 
>>             connection->start(); 
>>             
>>             connection->setExceptionListener(this); 
>>
>>             // Create a Session 
>>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> ); 
>>                         destination = session->createTopic( "mytopic" ); 
>>                         consumer = session->createDurableConsumer(
>> destination , user , "",false); 
>>             
>>             consumer->setMessageListener( this ); 
>>             
>>             Thread::sleep( waitMillis ); 
>>             
>>         } catch (CMSException& e) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>     
>>     virtual void onMessage( const Message* message ){ 
>>     
>>         try 
>>         { 
>>        const TextMessage* textMessage = 
>>                 dynamic_cast< const TextMessage* >( message ); 
>>             string text = textMessage->getText(); 
>>             printf( "Received: %s\n", text.c_str() ); 
>>         } catch (CMSException& e) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>
>>     virtual void onException( const CMSException& ex ) { 
>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>     } 
>>     
>> private: 
>>
>>     void cleanup(){ 
>>     
>>                 // Destroy resources. 
>>                 try{                         
>>         if( destination != NULL ) delete destination; 
>>                 }catch (CMSException& e) {} 
>>                 destination = NULL; 
>>                 
>>                 try{ 
>>             if( consumer != NULL ) delete consumer; 
>>                 }catch (CMSException& e) {} 
>>                 consumer = NULL; 
>>                 
>>                 // Close open resources. 
>>                 try{ 
>>                         if( session != NULL ) session->close(); 
>>                         if( connection != NULL ) connection->close(); 
>>                 }catch (CMSException& e) {} 
>>                 
>>         try{ 
>>         if( session != NULL ) delete session; 
>>                 }catch (CMSException& e) {} 
>>                 session = NULL; 
>>                 
>>                 try{ 
>>         if( connection != NULL ) delete connection; 
>>                 }catch (CMSException& e) {} 
>>                 connection = NULL; 
>>     } 
>> }; 
>>   void Produce() 
>>  { 
>>      HelloWorldProducer producer( 2 ); 
>>      Thread producerThread( &producer ); 
>>      producerThread.start(); 
>>      producerThread.join(); 
>>  } 
>>  void Consumer1() 
>>  { 
>>       HelloWorldConsumer consumer( 10000 ); 
>>       Thread consumerThread( &consumer ); 
>>       consumerThread.start(); 
>>       consumerThread.join(); 
>>  } 
>>  void Consumer2() 
>>  { 
>>       HelloWorldConsumer consumer( 10000 ); 
>>       Thread consumerThread( &consumer ); 
>>       consumerThread.start(); 
>>       consumerThread.join(); 
>>  } 
>>  int main(int argc, char* argv[]) 
>>  { 
>>       Produce(); 
>>       Consumer1(); 
>>       Consumer2(); 
>>  }
>>
>>     
>
>   


Re: SOS

Posted by sgliu <sh...@sina.com>.
Please help me.


sgliu wrote:
> 
> I wish I producer  message A, and then I exit the producer program. Then I
> start two consumer program(one is C1,the other is C2) at same time.C1  can
> receive A , C2 can receive A.
> 
> #include <activemq/concurrent/Thread.h> 
> #include <activemq/concurrent/Runnable.h> 
> #include <activemq/core/ActiveMQConnectionFactory.h> 
> #include <activemq/util/Integer.h> 
> #include <cms/Connection.h> 
> #include <cms/Session.h> 
> #include <cms/TextMessage.h> 
> #include <cms/ExceptionListener.h> 
> #include <cms/MessageListener.h> 
> #include <stdlib.h> 
> 
> using namespace activemq::core; 
> using namespace activemq::util; 
> using namespace activemq::concurrent; 
> using namespace cms; 
> using namespace std; 
> 
> class HelloWorldProducer : public Runnable { 
> private: 
>         
>         Connection* connection; 
>         Session* session; 
>         Topic* destination; 
>         MessageProducer* producer; 
>         int numMessages; 
> 
> public: 
>         
>         HelloWorldProducer( int numMessages ){ 
>                 connection = NULL; 
>     session = NULL; 
>     destination = NULL; 
>     producer = NULL; 
>     this->numMessages = numMessages; 
>         } 
>         
>         virtual ~HelloWorldProducer(){ 
>                 cleanup(); 
>         } 
>         
>     virtual void run() { 
>         try { 
>                         string user,passwd,sID; 
>                         user="default"; 
>                         passwd=""; 
>                         sID="lsgID"; 
>             ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
> 
>             connection =
> connectionFactory->createConnection(user,passwd,sID); 
>             connection->start(); 
> 
>                         string sss=connection->getClientId(); 
>                         cout << sss << endl; 
> 
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> ); 
>                         destination = session->createTopic( "mytopic" ); 
> 
>                         producer = session->createProducer( destination ); 
>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>             
>                         producer->setTimeToLive(100000000); 
>             string threadIdStr = Integer::toString( Thread::getId() ); 
>             
>             // Create a messages 
>             string text = (string)"Hello world! from thread " +
> threadIdStr; 
>             
>             for( int ix=0; ix<numMessages; ++ix ){ 
>                     TextMessage* message = session->createTextMessage(
> text ); 
> 
>                                 string messageID="messageID"; 
>                                 message->setCMSExpiration(10000000000); 
>                                 message->setCMSMessageId(messageID); 
> 
>            // Tell the producer to send the message 
>            printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
>         producer->send( message ); 
>             
>             delete message; 
>             } 
>                         
>         }catch ( CMSException& e ) { 
>             e.printStackTrace(); 
>         } 
>     } 
>     
> private: 
> 
>     void cleanup(){ 
>     
>                         // Destroy resources. 
>                         try{                         
>             if( destination != NULL ) delete destination; 
>                         }catch ( CMSException& e ) {} 
>                         destination = NULL; 
>                         
>                         try{ 
>                     if( producer != NULL ) delete producer; 
>                         }catch ( CMSException& e ) {} 
>                         producer = NULL; 
>                         
>     // Close open resources. 
>     try{ 
>     if( session != NULL ) session->close(); 
>     if( connection != NULL ) connection->close(); 
>                         }catch ( CMSException& e ) {} 
> 
>                         try{ 
>             if( session != NULL ) delete session; 
>                         }catch ( CMSException& e ) {} 
>                         session = NULL; 
>                         
>             try{ 
>             if( connection != NULL ) delete connection; 
>                         }catch ( CMSException& e ) {} 
>     connection = NULL; 
>     } 
> }; 
> 
> class HelloWorldConsumer : public ExceptionListener, 
>                            public MessageListener, 
>                            public Runnable { 
>         
> private: 
>         
>         Connection* connection; 
>         Session* session; 
>         Topic* destination; 
>         MessageConsumer* consumer; 
>         long waitMillis; 
>                 
> public: 
> 
>         HelloWorldConsumer( long waitMillis ){ 
>                 connection = NULL; 
>     session = NULL; 
>     destination = NULL; 
>     consumer = NULL; 
>     this->waitMillis = waitMillis; 
>         } 
>     virtual ~HelloWorldConsumer(){     
>     cleanup(); 
>     } 
>     
>     virtual void run() { 
>         
>         try { 
> 
>                         string user,passwd,sID; 
>                         user="default"; 
>                         passwd=""; 
>                         sID="lsgID"; 
>             // Create a ConnectionFactory 
>             ActiveMQConnectionFactory* connectionFactory = 
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID); 
> 
>             // Create a Connection 
>             connection =
> connectionFactory->createConnection();//user,passwd,sID); 
>             delete connectionFactory; 
>             connection->start(); 
>             
>             connection->setExceptionListener(this); 
> 
>             // Create a Session 
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> ); 
>                         destination = session->createTopic( "mytopic" ); 
>                         consumer = session->createDurableConsumer(
> destination , user , "",false); 
>             
>             consumer->setMessageListener( this ); 
>             
>             Thread::sleep( waitMillis ); 
>             
>         } catch (CMSException& e) { 
>             e.printStackTrace(); 
>         } 
>     } 
>     
>     virtual void onMessage( const Message* message ){ 
>     
>         try 
>         { 
>        const TextMessage* textMessage = 
>                 dynamic_cast< const TextMessage* >( message ); 
>             string text = textMessage->getText(); 
>             printf( "Received: %s\n", text.c_str() ); 
>         } catch (CMSException& e) { 
>             e.printStackTrace(); 
>         } 
>     } 
> 
>     virtual void onException( const CMSException& ex ) { 
>         printf("JMS Exception occured.  Shutting down client.\n"); 
>     } 
>     
> private: 
> 
>     void cleanup(){ 
>     
>                 // Destroy resources. 
>                 try{                         
>         if( destination != NULL ) delete destination; 
>                 }catch (CMSException& e) {} 
>                 destination = NULL; 
>                 
>                 try{ 
>             if( consumer != NULL ) delete consumer; 
>                 }catch (CMSException& e) {} 
>                 consumer = NULL; 
>                 
>                 // Close open resources. 
>                 try{ 
>                         if( session != NULL ) session->close(); 
>                         if( connection != NULL ) connection->close(); 
>                 }catch (CMSException& e) {} 
>                 
>         try{ 
>         if( session != NULL ) delete session; 
>                 }catch (CMSException& e) {} 
>                 session = NULL; 
>                 
>                 try{ 
>         if( connection != NULL ) delete connection; 
>                 }catch (CMSException& e) {} 
>                 connection = NULL; 
>     } 
> }; 
>   void Produce() 
>  { 
>      HelloWorldProducer producer( 2 ); 
>      Thread producerThread( &producer ); 
>      producerThread.start(); 
>      producerThread.join(); 
>  } 
>  void Consumer1() 
>  { 
>       HelloWorldConsumer consumer( 10000 ); 
>       Thread consumerThread( &consumer ); 
>       consumerThread.start(); 
>       consumerThread.join(); 
>  } 
>  void Consumer2() 
>  { 
>       HelloWorldConsumer consumer( 10000 ); 
>       Thread consumerThread( &consumer ); 
>       consumerThread.start(); 
>       consumerThread.join(); 
>  } 
>  int main(int argc, char* argv[]) 
>  { 
>       Produce(); 
>       Consumer1(); 
>       Consumer2(); 
>  }
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7485492
Sent from the ActiveMQ - User mailing list archive at Nabble.com.