You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sgliu <sh...@sina.com> on 2006/11/20 01:52:56 UTC
SOS
I wish I producer message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1 can
receive A , C2 can receive A.
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
class HelloWorldProducer : public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageProducer* producer;
int numMessages;
public:
HelloWorldProducer( int numMessages ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
}
virtual ~HelloWorldProducer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
connection =
connectionFactory->createConnection(user,passwd,sID);
connection->start();
string sss=connection->getClientId();
cout << sss << endl;
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
destination = session->createTopic( "mytopic" );
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::PERSISTANT );
producer->setTimeToLive(100000000);
string threadIdStr = Integer::toString( Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text
);
string messageID="messageID";
message->setCMSExpiration(10000000000);
message->setCMSMessageId(messageID);
// Tell the producer to send the message
printf( "Sent message from thread %s\n", threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) {}
destination = NULL;
try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) {}
producer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) {}
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) {}
connection = NULL;
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
Connection* connection;
Session* session;
Topic* destination;
MessageConsumer* consumer;
long waitMillis;
public:
HelloWorldConsumer( long waitMillis ){
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->waitMillis = waitMillis;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
virtual void run() {
try {
string user,passwd,sID;
user="default";
passwd="";
sID="lsgID";
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);
// Create a Connection
connection =
connectionFactory->createConnection();//user,passwd,sID);
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
destination = session->createTopic( "mytopic" );
consumer = session->createDurableConsumer(
destination , user , "",false);
consumer->setMessageListener( this );
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ){
try
{
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Received: %s\n", text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex ) {
printf("JMS Exception occured. Shutting down client.\n");
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
void Produce()
{
HelloWorldProducer producer( 2 );
Thread producerThread( &producer );
producerThread.start();
producerThread.join();
}
void Consumer1()
{
HelloWorldConsumer consumer( 10000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
}
void Consumer2()
{
HelloWorldConsumer consumer( 10000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
}
int main(int argc, char* argv[])
{
Produce();
Consumer1();
Consumer2();
}
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7435462
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: SOS
Posted by sgliu <sh...@sina.com>.
"retroactive consumers",
follow code,just I want.
queue = new ActiveMQQueue("TEST.QUEUE?consumer.retroactive=true");
consumer = session.createConsumer(queue);
Thanks a lot.
Adrian Co wrote:
>
> I do not understand what you want. If you want to send first, then still
> receive later using topics, you could try using retroactive consumers:
> http://www.activemq.org/site/retroactive-consumer.html or create durable
> subscribers for more reliable messaging,
>
> sgliu wrote:
>> Send first,then receive.In topic.setCMSExpiration() function?
>> What can I do?
>>
>>
>>
>> Adrian Co wrote:
>>
>>> Try subscribing the consumers first before sending messages.
>>>
>>> int main(int argc, char* argv[])
>>> {
>>> Consumer1();
>>> Consumer2();
>>>
>>> Produce();
>>>
>>> }
>>>
>>>
>>>
>>> sgliu wrote:
>>>
>>>> Please help me.
>>>>
>>>>
>>>> sgliu wrote:
>>>>
>>>>
>>>>> I wish I producer message A, and then I exit the producer program.
>>>>> Then
>>>>> I
>>>>> start two consumer program(one is C1,the other is C2) at same time.C1
>>>>> can
>>>>> receive A , C2 can receive A.
>>>>>
>>>>> #include <activemq/concurrent/Thread.h>
>>>>> #include <activemq/concurrent/Runnable.h>
>>>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>>>> #include <activemq/util/Integer.h>
>>>>> #include <cms/Connection.h>
>>>>> #include <cms/Session.h>
>>>>> #include <cms/TextMessage.h>
>>>>> #include <cms/ExceptionListener.h>
>>>>> #include <cms/MessageListener.h>
>>>>> #include <stdlib.h>
>>>>>
>>>>> using namespace activemq::core;
>>>>> using namespace activemq::util;
>>>>> using namespace activemq::concurrent;
>>>>> using namespace cms;
>>>>> using namespace std;
>>>>>
>>>>> class HelloWorldProducer : public Runnable {
>>>>> private:
>>>>>
>>>>> Connection* connection;
>>>>> Session* session;
>>>>> Topic* destination;
>>>>> MessageProducer* producer;
>>>>> int numMessages;
>>>>>
>>>>> public:
>>>>>
>>>>> HelloWorldProducer( int numMessages ){
>>>>> connection = NULL;
>>>>> session = NULL;
>>>>> destination = NULL;
>>>>> producer = NULL;
>>>>> this->numMessages = numMessages;
>>>>> }
>>>>>
>>>>> virtual ~HelloWorldProducer(){
>>>>> cleanup();
>>>>> }
>>>>>
>>>>> virtual void run() {
>>>>> try {
>>>>> string user,passwd,sID;
>>>>> user="default";
>>>>> passwd="";
>>>>> sID="lsgID";
>>>>> ActiveMQConnectionFactory* connectionFactory = new
>>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>>>>
>>>>> connection =
>>>>> connectionFactory->createConnection(user,passwd,sID);
>>>>> connection->start();
>>>>>
>>>>> string sss=connection->getClientId();
>>>>> cout << sss << endl;
>>>>>
>>>>> session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> );
>>>>> destination = session->createTopic( "mytopic"
>>>>> );
>>>>>
>>>>> producer = session->createProducer(
>>>>> destination
>>>>> );
>>>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>>>
>>>>> producer->setTimeToLive(100000000);
>>>>> string threadIdStr = Integer::toString( Thread::getId() );
>>>>>
>>>>> // Create a messages
>>>>> string text = (string)"Hello world! from thread " +
>>>>> threadIdStr;
>>>>>
>>>>> for( int ix=0; ix<numMessages; ++ix ){
>>>>> TextMessage* message = session->createTextMessage(
>>>>> text );
>>>>>
>>>>> string messageID="messageID";
>>>>>
>>>>> message->setCMSExpiration(10000000000);
>>>>> message->setCMSMessageId(messageID);
>>>>>
>>>>> // Tell the producer to send the message
>>>>> printf( "Sent message from thread %s\n",
>>>>> threadIdStr.c_str()
>>>>> );
>>>>> producer->send( message );
>>>>>
>>>>> delete message;
>>>>> }
>>>>>
>>>>> }catch ( CMSException& e ) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> private:
>>>>>
>>>>> void cleanup(){
>>>>>
>>>>> // Destroy resources.
>>>>> try{
>>>>> if( destination != NULL ) delete destination;
>>>>> }catch ( CMSException& e ) {}
>>>>> destination = NULL;
>>>>>
>>>>> try{
>>>>> if( producer != NULL ) delete producer;
>>>>> }catch ( CMSException& e ) {}
>>>>> producer = NULL;
>>>>>
>>>>> // Close open resources.
>>>>> try{
>>>>> if( session != NULL ) session->close();
>>>>> if( connection != NULL ) connection->close();
>>>>> }catch ( CMSException& e ) {}
>>>>>
>>>>> try{
>>>>> if( session != NULL ) delete session;
>>>>> }catch ( CMSException& e ) {}
>>>>> session = NULL;
>>>>>
>>>>> try{
>>>>> if( connection != NULL ) delete connection;
>>>>> }catch ( CMSException& e ) {}
>>>>> connection = NULL;
>>>>> }
>>>>> };
>>>>>
>>>>> class HelloWorldConsumer : public ExceptionListener,
>>>>> public MessageListener,
>>>>> public Runnable {
>>>>>
>>>>> private:
>>>>>
>>>>> Connection* connection;
>>>>> Session* session;
>>>>> Topic* destination;
>>>>> MessageConsumer* consumer;
>>>>> long waitMillis;
>>>>>
>>>>> public:
>>>>>
>>>>> HelloWorldConsumer( long waitMillis ){
>>>>> connection = NULL;
>>>>> session = NULL;
>>>>> destination = NULL;
>>>>> consumer = NULL;
>>>>> this->waitMillis = waitMillis;
>>>>> }
>>>>> virtual ~HelloWorldConsumer(){
>>>>> cleanup();
>>>>> }
>>>>>
>>>>> virtual void run() {
>>>>>
>>>>> try {
>>>>>
>>>>> string user,passwd,sID;
>>>>> user="default";
>>>>> passwd="";
>>>>> sID="lsgID";
>>>>> // Create a ConnectionFactory
>>>>> ActiveMQConnectionFactory* connectionFactory =
>>>>> new ActiveMQConnectionFactory(
>>>>> "tcp://localhost:61613",user,passwd,sID);
>>>>>
>>>>> // Create a Connection
>>>>> connection =
>>>>> connectionFactory->createConnection();//user,passwd,sID);
>>>>> delete connectionFactory;
>>>>> connection->start();
>>>>>
>>>>> connection->setExceptionListener(this);
>>>>>
>>>>> // Create a Session
>>>>> session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> );
>>>>> destination = session->createTopic( "mytopic"
>>>>> );
>>>>> consumer = session->createDurableConsumer(
>>>>> destination , user , "",false);
>>>>>
>>>>> consumer->setMessageListener( this );
>>>>>
>>>>> Thread::sleep( waitMillis );
>>>>>
>>>>> } catch (CMSException& e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> virtual void onMessage( const Message* message ){
>>>>>
>>>>> try
>>>>> {
>>>>> const TextMessage* textMessage =
>>>>> dynamic_cast< const TextMessage* >( message );
>>>>> string text = textMessage->getText();
>>>>> printf( "Received: %s\n", text.c_str() );
>>>>> } catch (CMSException& e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> virtual void onException( const CMSException& ex ) {
>>>>> printf("JMS Exception occured. Shutting down client.\n");
>>>>> }
>>>>>
>>>>> private:
>>>>>
>>>>> void cleanup(){
>>>>>
>>>>> // Destroy resources.
>>>>> try{
>>>>> if( destination != NULL ) delete destination;
>>>>> }catch (CMSException& e) {}
>>>>> destination = NULL;
>>>>>
>>>>> try{
>>>>> if( consumer != NULL ) delete consumer;
>>>>> }catch (CMSException& e) {}
>>>>> consumer = NULL;
>>>>>
>>>>> // Close open resources.
>>>>> try{
>>>>> if( session != NULL ) session->close();
>>>>> if( connection != NULL ) connection->close();
>>>>> }catch (CMSException& e) {}
>>>>>
>>>>> try{
>>>>> if( session != NULL ) delete session;
>>>>> }catch (CMSException& e) {}
>>>>> session = NULL;
>>>>>
>>>>> try{
>>>>> if( connection != NULL ) delete connection;
>>>>> }catch (CMSException& e) {}
>>>>> connection = NULL;
>>>>> }
>>>>> };
>>>>> void Produce()
>>>>> {
>>>>> HelloWorldProducer producer( 2 );
>>>>> Thread producerThread( &producer );
>>>>> producerThread.start();
>>>>> producerThread.join();
>>>>> }
>>>>> void Consumer1()
>>>>> {
>>>>> HelloWorldConsumer consumer( 10000 );
>>>>> Thread consumerThread( &consumer );
>>>>> consumerThread.start();
>>>>> consumerThread.join();
>>>>> }
>>>>> void Consumer2()
>>>>> {
>>>>> HelloWorldConsumer consumer( 10000 );
>>>>> Thread consumerThread( &consumer );
>>>>> consumerThread.start();
>>>>> consumerThread.join();
>>>>> }
>>>>> int main(int argc, char* argv[])
>>>>> {
>>>>> Produce();
>>>>> Consumer1();
>>>>> Consumer2();
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>
>
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7535583
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: SOS
Posted by Adrian Co <ac...@exist.com>.
I do not understand what you want. If you want to send first, then still
receive later using topics, you could try using retroactive consumers:
http://www.activemq.org/site/retroactive-consumer.html or create durable
subscribers for more reliable messaging,
sgliu wrote:
> Send first,then receive.In topic.setCMSExpiration() function?
> What can I do?
>
>
>
> Adrian Co wrote:
>
>> Try subscribing the consumers first before sending messages.
>>
>> int main(int argc, char* argv[])
>> {
>> Consumer1();
>> Consumer2();
>>
>> Produce();
>>
>> }
>>
>>
>>
>> sgliu wrote:
>>
>>> Please help me.
>>>
>>>
>>> sgliu wrote:
>>>
>>>
>>>> I wish I producer message A, and then I exit the producer program. Then
>>>> I
>>>> start two consumer program(one is C1,the other is C2) at same time.C1
>>>> can
>>>> receive A , C2 can receive A.
>>>>
>>>> #include <activemq/concurrent/Thread.h>
>>>> #include <activemq/concurrent/Runnable.h>
>>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>>> #include <activemq/util/Integer.h>
>>>> #include <cms/Connection.h>
>>>> #include <cms/Session.h>
>>>> #include <cms/TextMessage.h>
>>>> #include <cms/ExceptionListener.h>
>>>> #include <cms/MessageListener.h>
>>>> #include <stdlib.h>
>>>>
>>>> using namespace activemq::core;
>>>> using namespace activemq::util;
>>>> using namespace activemq::concurrent;
>>>> using namespace cms;
>>>> using namespace std;
>>>>
>>>> class HelloWorldProducer : public Runnable {
>>>> private:
>>>>
>>>> Connection* connection;
>>>> Session* session;
>>>> Topic* destination;
>>>> MessageProducer* producer;
>>>> int numMessages;
>>>>
>>>> public:
>>>>
>>>> HelloWorldProducer( int numMessages ){
>>>> connection = NULL;
>>>> session = NULL;
>>>> destination = NULL;
>>>> producer = NULL;
>>>> this->numMessages = numMessages;
>>>> }
>>>>
>>>> virtual ~HelloWorldProducer(){
>>>> cleanup();
>>>> }
>>>>
>>>> virtual void run() {
>>>> try {
>>>> string user,passwd,sID;
>>>> user="default";
>>>> passwd="";
>>>> sID="lsgID";
>>>> ActiveMQConnectionFactory* connectionFactory = new
>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>>>
>>>> connection =
>>>> connectionFactory->createConnection(user,passwd,sID);
>>>> connection->start();
>>>>
>>>> string sss=connection->getClientId();
>>>> cout << sss << endl;
>>>>
>>>> session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> );
>>>> destination = session->createTopic( "mytopic" );
>>>>
>>>> producer = session->createProducer( destination
>>>> );
>>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>>
>>>> producer->setTimeToLive(100000000);
>>>> string threadIdStr = Integer::toString( Thread::getId() );
>>>>
>>>> // Create a messages
>>>> string text = (string)"Hello world! from thread " +
>>>> threadIdStr;
>>>>
>>>> for( int ix=0; ix<numMessages; ++ix ){
>>>> TextMessage* message = session->createTextMessage(
>>>> text );
>>>>
>>>> string messageID="messageID";
>>>> message->setCMSExpiration(10000000000);
>>>> message->setCMSMessageId(messageID);
>>>>
>>>> // Tell the producer to send the message
>>>> printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>>> );
>>>> producer->send( message );
>>>>
>>>> delete message;
>>>> }
>>>>
>>>> }catch ( CMSException& e ) {
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> private:
>>>>
>>>> void cleanup(){
>>>>
>>>> // Destroy resources.
>>>> try{
>>>> if( destination != NULL ) delete destination;
>>>> }catch ( CMSException& e ) {}
>>>> destination = NULL;
>>>>
>>>> try{
>>>> if( producer != NULL ) delete producer;
>>>> }catch ( CMSException& e ) {}
>>>> producer = NULL;
>>>>
>>>> // Close open resources.
>>>> try{
>>>> if( session != NULL ) session->close();
>>>> if( connection != NULL ) connection->close();
>>>> }catch ( CMSException& e ) {}
>>>>
>>>> try{
>>>> if( session != NULL ) delete session;
>>>> }catch ( CMSException& e ) {}
>>>> session = NULL;
>>>>
>>>> try{
>>>> if( connection != NULL ) delete connection;
>>>> }catch ( CMSException& e ) {}
>>>> connection = NULL;
>>>> }
>>>> };
>>>>
>>>> class HelloWorldConsumer : public ExceptionListener,
>>>> public MessageListener,
>>>> public Runnable {
>>>>
>>>> private:
>>>>
>>>> Connection* connection;
>>>> Session* session;
>>>> Topic* destination;
>>>> MessageConsumer* consumer;
>>>> long waitMillis;
>>>>
>>>> public:
>>>>
>>>> HelloWorldConsumer( long waitMillis ){
>>>> connection = NULL;
>>>> session = NULL;
>>>> destination = NULL;
>>>> consumer = NULL;
>>>> this->waitMillis = waitMillis;
>>>> }
>>>> virtual ~HelloWorldConsumer(){
>>>> cleanup();
>>>> }
>>>>
>>>> virtual void run() {
>>>>
>>>> try {
>>>>
>>>> string user,passwd,sID;
>>>> user="default";
>>>> passwd="";
>>>> sID="lsgID";
>>>> // Create a ConnectionFactory
>>>> ActiveMQConnectionFactory* connectionFactory =
>>>> new ActiveMQConnectionFactory(
>>>> "tcp://localhost:61613",user,passwd,sID);
>>>>
>>>> // Create a Connection
>>>> connection =
>>>> connectionFactory->createConnection();//user,passwd,sID);
>>>> delete connectionFactory;
>>>> connection->start();
>>>>
>>>> connection->setExceptionListener(this);
>>>>
>>>> // Create a Session
>>>> session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> );
>>>> destination = session->createTopic( "mytopic" );
>>>> consumer = session->createDurableConsumer(
>>>> destination , user , "",false);
>>>>
>>>> consumer->setMessageListener( this );
>>>>
>>>> Thread::sleep( waitMillis );
>>>>
>>>> } catch (CMSException& e) {
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> virtual void onMessage( const Message* message ){
>>>>
>>>> try
>>>> {
>>>> const TextMessage* textMessage =
>>>> dynamic_cast< const TextMessage* >( message );
>>>> string text = textMessage->getText();
>>>> printf( "Received: %s\n", text.c_str() );
>>>> } catch (CMSException& e) {
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> virtual void onException( const CMSException& ex ) {
>>>> printf("JMS Exception occured. Shutting down client.\n");
>>>> }
>>>>
>>>> private:
>>>>
>>>> void cleanup(){
>>>>
>>>> // Destroy resources.
>>>> try{
>>>> if( destination != NULL ) delete destination;
>>>> }catch (CMSException& e) {}
>>>> destination = NULL;
>>>>
>>>> try{
>>>> if( consumer != NULL ) delete consumer;
>>>> }catch (CMSException& e) {}
>>>> consumer = NULL;
>>>>
>>>> // Close open resources.
>>>> try{
>>>> if( session != NULL ) session->close();
>>>> if( connection != NULL ) connection->close();
>>>> }catch (CMSException& e) {}
>>>>
>>>> try{
>>>> if( session != NULL ) delete session;
>>>> }catch (CMSException& e) {}
>>>> session = NULL;
>>>>
>>>> try{
>>>> if( connection != NULL ) delete connection;
>>>> }catch (CMSException& e) {}
>>>> connection = NULL;
>>>> }
>>>> };
>>>> void Produce()
>>>> {
>>>> HelloWorldProducer producer( 2 );
>>>> Thread producerThread( &producer );
>>>> producerThread.start();
>>>> producerThread.join();
>>>> }
>>>> void Consumer1()
>>>> {
>>>> HelloWorldConsumer consumer( 10000 );
>>>> Thread consumerThread( &consumer );
>>>> consumerThread.start();
>>>> consumerThread.join();
>>>> }
>>>> void Consumer2()
>>>> {
>>>> HelloWorldConsumer consumer( 10000 );
>>>> Thread consumerThread( &consumer );
>>>> consumerThread.start();
>>>> consumerThread.join();
>>>> }
>>>> int main(int argc, char* argv[])
>>>> {
>>>> Produce();
>>>> Consumer1();
>>>> Consumer2();
>>>> }
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>
Re: SOS
Posted by sgliu <sh...@sina.com>.
Send first,then receive.In topic.setCMSExpiration() function?
What can I do?
Adrian Co wrote:
>
> Try subscribing the consumers first before sending messages.
>
> int main(int argc, char* argv[])
> {
> Consumer1();
> Consumer2();
>
> Produce();
>
> }
>
>
>
> sgliu wrote:
>> Please help me.
>>
>>
>> sgliu wrote:
>>
>>> I wish I producer message A, and then I exit the producer program. Then
>>> I
>>> start two consumer program(one is C1,the other is C2) at same time.C1
>>> can
>>> receive A , C2 can receive A.
>>>
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>>
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>>
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageProducer* producer;
>>> int numMessages;
>>>
>>> public:
>>>
>>> HelloWorldProducer( int numMessages ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> producer = NULL;
>>> this->numMessages = numMessages;
>>> }
>>>
>>> virtual ~HelloWorldProducer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>> try {
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>>
>>> connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>> connection->start();
>>>
>>> string sss=connection->getClientId();
>>> cout << sss << endl;
>>>
>>> session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>>
>>> producer = session->createProducer( destination
>>> );
>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>
>>> producer->setTimeToLive(100000000);
>>> string threadIdStr = Integer::toString( Thread::getId() );
>>>
>>> // Create a messages
>>> string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>>
>>> for( int ix=0; ix<numMessages; ++ix ){
>>> TextMessage* message = session->createTextMessage(
>>> text );
>>>
>>> string messageID="messageID";
>>> message->setCMSExpiration(10000000000);
>>> message->setCMSMessageId(messageID);
>>>
>>> // Tell the producer to send the message
>>> printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>> );
>>> producer->send( message );
>>>
>>> delete message;
>>> }
>>>
>>> }catch ( CMSException& e ) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch ( CMSException& e ) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( producer != NULL ) delete producer;
>>> }catch ( CMSException& e ) {}
>>> producer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch ( CMSException& e ) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch ( CMSException& e ) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch ( CMSException& e ) {}
>>> connection = NULL;
>>> }
>>> };
>>>
>>> class HelloWorldConsumer : public ExceptionListener,
>>> public MessageListener,
>>> public Runnable {
>>>
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageConsumer* consumer;
>>> long waitMillis;
>>>
>>> public:
>>>
>>> HelloWorldConsumer( long waitMillis ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> consumer = NULL;
>>> this->waitMillis = waitMillis;
>>> }
>>> virtual ~HelloWorldConsumer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>>
>>> try {
>>>
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> // Create a ConnectionFactory
>>> ActiveMQConnectionFactory* connectionFactory =
>>> new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>>
>>> // Create a Connection
>>> connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>> delete connectionFactory;
>>> connection->start();
>>>
>>> connection->setExceptionListener(this);
>>>
>>> // Create a Session
>>> session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>> consumer = session->createDurableConsumer(
>>> destination , user , "",false);
>>>
>>> consumer->setMessageListener( this );
>>>
>>> Thread::sleep( waitMillis );
>>>
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onMessage( const Message* message ){
>>>
>>> try
>>> {
>>> const TextMessage* textMessage =
>>> dynamic_cast< const TextMessage* >( message );
>>> string text = textMessage->getText();
>>> printf( "Received: %s\n", text.c_str() );
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onException( const CMSException& ex ) {
>>> printf("JMS Exception occured. Shutting down client.\n");
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch (CMSException& e) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( consumer != NULL ) delete consumer;
>>> }catch (CMSException& e) {}
>>> consumer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch (CMSException& e) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch (CMSException& e) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch (CMSException& e) {}
>>> connection = NULL;
>>> }
>>> };
>>> void Produce()
>>> {
>>> HelloWorldProducer producer( 2 );
>>> Thread producerThread( &producer );
>>> producerThread.start();
>>> producerThread.join();
>>> }
>>> void Consumer1()
>>> {
>>> HelloWorldConsumer consumer( 10000 );
>>> Thread consumerThread( &consumer );
>>> consumerThread.start();
>>> consumerThread.join();
>>> }
>>> void Consumer2()
>>> {
>>> HelloWorldConsumer consumer( 10000 );
>>> Thread consumerThread( &consumer );
>>> consumerThread.start();
>>> consumerThread.join();
>>> }
>>> int main(int argc, char* argv[])
>>> {
>>> Produce();
>>> Consumer1();
>>> Consumer2();
>>> }
>>>
>>>
>>
>>
>
>
>
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7500819
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Re: SOS
Posted by Adrian Co <ac...@exist.com>.
Try subscribing the consumers first before sending messages.
int main(int argc, char* argv[])
{
Consumer1();
Consumer2();
Produce();
}
sgliu wrote:
> Please help me.
>
>
> sgliu wrote:
>
>> I wish I producer message A, and then I exit the producer program. Then I
>> start two consumer program(one is C1,the other is C2) at same time.C1 can
>> receive A , C2 can receive A.
>>
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Integer.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>>
>> using namespace activemq::core;
>> using namespace activemq::util;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>>
>> class HelloWorldProducer : public Runnable {
>> private:
>>
>> Connection* connection;
>> Session* session;
>> Topic* destination;
>> MessageProducer* producer;
>> int numMessages;
>>
>> public:
>>
>> HelloWorldProducer( int numMessages ){
>> connection = NULL;
>> session = NULL;
>> destination = NULL;
>> producer = NULL;
>> this->numMessages = numMessages;
>> }
>>
>> virtual ~HelloWorldProducer(){
>> cleanup();
>> }
>>
>> virtual void run() {
>> try {
>> string user,passwd,sID;
>> user="default";
>> passwd="";
>> sID="lsgID";
>> ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>
>> connection =
>> connectionFactory->createConnection(user,passwd,sID);
>> connection->start();
>>
>> string sss=connection->getClientId();
>> cout << sss << endl;
>>
>> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> );
>> destination = session->createTopic( "mytopic" );
>>
>> producer = session->createProducer( destination );
>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>
>> producer->setTimeToLive(100000000);
>> string threadIdStr = Integer::toString( Thread::getId() );
>>
>> // Create a messages
>> string text = (string)"Hello world! from thread " +
>> threadIdStr;
>>
>> for( int ix=0; ix<numMessages; ++ix ){
>> TextMessage* message = session->createTextMessage(
>> text );
>>
>> string messageID="messageID";
>> message->setCMSExpiration(10000000000);
>> message->setCMSMessageId(messageID);
>>
>> // Tell the producer to send the message
>> printf( "Sent message from thread %s\n", threadIdStr.c_str() );
>> producer->send( message );
>>
>> delete message;
>> }
>>
>> }catch ( CMSException& e ) {
>> e.printStackTrace();
>> }
>> }
>>
>> private:
>>
>> void cleanup(){
>>
>> // Destroy resources.
>> try{
>> if( destination != NULL ) delete destination;
>> }catch ( CMSException& e ) {}
>> destination = NULL;
>>
>> try{
>> if( producer != NULL ) delete producer;
>> }catch ( CMSException& e ) {}
>> producer = NULL;
>>
>> // Close open resources.
>> try{
>> if( session != NULL ) session->close();
>> if( connection != NULL ) connection->close();
>> }catch ( CMSException& e ) {}
>>
>> try{
>> if( session != NULL ) delete session;
>> }catch ( CMSException& e ) {}
>> session = NULL;
>>
>> try{
>> if( connection != NULL ) delete connection;
>> }catch ( CMSException& e ) {}
>> connection = NULL;
>> }
>> };
>>
>> class HelloWorldConsumer : public ExceptionListener,
>> public MessageListener,
>> public Runnable {
>>
>> private:
>>
>> Connection* connection;
>> Session* session;
>> Topic* destination;
>> MessageConsumer* consumer;
>> long waitMillis;
>>
>> public:
>>
>> HelloWorldConsumer( long waitMillis ){
>> connection = NULL;
>> session = NULL;
>> destination = NULL;
>> consumer = NULL;
>> this->waitMillis = waitMillis;
>> }
>> virtual ~HelloWorldConsumer(){
>> cleanup();
>> }
>>
>> virtual void run() {
>>
>> try {
>>
>> string user,passwd,sID;
>> user="default";
>> passwd="";
>> sID="lsgID";
>> // Create a ConnectionFactory
>> ActiveMQConnectionFactory* connectionFactory =
>> new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID);
>>
>> // Create a Connection
>> connection =
>> connectionFactory->createConnection();//user,passwd,sID);
>> delete connectionFactory;
>> connection->start();
>>
>> connection->setExceptionListener(this);
>>
>> // Create a Session
>> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> );
>> destination = session->createTopic( "mytopic" );
>> consumer = session->createDurableConsumer(
>> destination , user , "",false);
>>
>> consumer->setMessageListener( this );
>>
>> Thread::sleep( waitMillis );
>>
>> } catch (CMSException& e) {
>> e.printStackTrace();
>> }
>> }
>>
>> virtual void onMessage( const Message* message ){
>>
>> try
>> {
>> const TextMessage* textMessage =
>> dynamic_cast< const TextMessage* >( message );
>> string text = textMessage->getText();
>> printf( "Received: %s\n", text.c_str() );
>> } catch (CMSException& e) {
>> e.printStackTrace();
>> }
>> }
>>
>> virtual void onException( const CMSException& ex ) {
>> printf("JMS Exception occured. Shutting down client.\n");
>> }
>>
>> private:
>>
>> void cleanup(){
>>
>> // Destroy resources.
>> try{
>> if( destination != NULL ) delete destination;
>> }catch (CMSException& e) {}
>> destination = NULL;
>>
>> try{
>> if( consumer != NULL ) delete consumer;
>> }catch (CMSException& e) {}
>> consumer = NULL;
>>
>> // Close open resources.
>> try{
>> if( session != NULL ) session->close();
>> if( connection != NULL ) connection->close();
>> }catch (CMSException& e) {}
>>
>> try{
>> if( session != NULL ) delete session;
>> }catch (CMSException& e) {}
>> session = NULL;
>>
>> try{
>> if( connection != NULL ) delete connection;
>> }catch (CMSException& e) {}
>> connection = NULL;
>> }
>> };
>> void Produce()
>> {
>> HelloWorldProducer producer( 2 );
>> Thread producerThread( &producer );
>> producerThread.start();
>> producerThread.join();
>> }
>> void Consumer1()
>> {
>> HelloWorldConsumer consumer( 10000 );
>> Thread consumerThread( &consumer );
>> consumerThread.start();
>> consumerThread.join();
>> }
>> void Consumer2()
>> {
>> HelloWorldConsumer consumer( 10000 );
>> Thread consumerThread( &consumer );
>> consumerThread.start();
>> consumerThread.join();
>> }
>> int main(int argc, char* argv[])
>> {
>> Produce();
>> Consumer1();
>> Consumer2();
>> }
>>
>>
>
>
Re: SOS
Posted by sgliu <sh...@sina.com>.
Please help me.
sgliu wrote:
>
> I wish I producer message A, and then I exit the producer program. Then I
> start two consumer program(one is C1,the other is C2) at same time.C1 can
> receive A , C2 can receive A.
>
> #include <activemq/concurrent/Thread.h>
> #include <activemq/concurrent/Runnable.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Integer.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
>
> using namespace activemq::core;
> using namespace activemq::util;
> using namespace activemq::concurrent;
> using namespace cms;
> using namespace std;
>
> class HelloWorldProducer : public Runnable {
> private:
>
> Connection* connection;
> Session* session;
> Topic* destination;
> MessageProducer* producer;
> int numMessages;
>
> public:
>
> HelloWorldProducer( int numMessages ){
> connection = NULL;
> session = NULL;
> destination = NULL;
> producer = NULL;
> this->numMessages = numMessages;
> }
>
> virtual ~HelloWorldProducer(){
> cleanup();
> }
>
> virtual void run() {
> try {
> string user,passwd,sID;
> user="default";
> passwd="";
> sID="lsgID";
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>
> connection =
> connectionFactory->createConnection(user,passwd,sID);
> connection->start();
>
> string sss=connection->getClientId();
> cout << sss << endl;
>
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
> destination = session->createTopic( "mytopic" );
>
> producer = session->createProducer( destination );
> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>
> producer->setTimeToLive(100000000);
> string threadIdStr = Integer::toString( Thread::getId() );
>
> // Create a messages
> string text = (string)"Hello world! from thread " +
> threadIdStr;
>
> for( int ix=0; ix<numMessages; ++ix ){
> TextMessage* message = session->createTextMessage(
> text );
>
> string messageID="messageID";
> message->setCMSExpiration(10000000000);
> message->setCMSMessageId(messageID);
>
> // Tell the producer to send the message
> printf( "Sent message from thread %s\n", threadIdStr.c_str() );
> producer->send( message );
>
> delete message;
> }
>
> }catch ( CMSException& e ) {
> e.printStackTrace();
> }
> }
>
> private:
>
> void cleanup(){
>
> // Destroy resources.
> try{
> if( destination != NULL ) delete destination;
> }catch ( CMSException& e ) {}
> destination = NULL;
>
> try{
> if( producer != NULL ) delete producer;
> }catch ( CMSException& e ) {}
> producer = NULL;
>
> // Close open resources.
> try{
> if( session != NULL ) session->close();
> if( connection != NULL ) connection->close();
> }catch ( CMSException& e ) {}
>
> try{
> if( session != NULL ) delete session;
> }catch ( CMSException& e ) {}
> session = NULL;
>
> try{
> if( connection != NULL ) delete connection;
> }catch ( CMSException& e ) {}
> connection = NULL;
> }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
> public MessageListener,
> public Runnable {
>
> private:
>
> Connection* connection;
> Session* session;
> Topic* destination;
> MessageConsumer* consumer;
> long waitMillis;
>
> public:
>
> HelloWorldConsumer( long waitMillis ){
> connection = NULL;
> session = NULL;
> destination = NULL;
> consumer = NULL;
> this->waitMillis = waitMillis;
> }
> virtual ~HelloWorldConsumer(){
> cleanup();
> }
>
> virtual void run() {
>
> try {
>
> string user,passwd,sID;
> user="default";
> passwd="";
> sID="lsgID";
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory =
> new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
>
> // Create a Connection
> connection =
> connectionFactory->createConnection();//user,passwd,sID);
> delete connectionFactory;
> connection->start();
>
> connection->setExceptionListener(this);
>
> // Create a Session
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
> destination = session->createTopic( "mytopic" );
> consumer = session->createDurableConsumer(
> destination , user , "",false);
>
> consumer->setMessageListener( this );
>
> Thread::sleep( waitMillis );
>
> } catch (CMSException& e) {
> e.printStackTrace();
> }
> }
>
> virtual void onMessage( const Message* message ){
>
> try
> {
> const TextMessage* textMessage =
> dynamic_cast< const TextMessage* >( message );
> string text = textMessage->getText();
> printf( "Received: %s\n", text.c_str() );
> } catch (CMSException& e) {
> e.printStackTrace();
> }
> }
>
> virtual void onException( const CMSException& ex ) {
> printf("JMS Exception occured. Shutting down client.\n");
> }
>
> private:
>
> void cleanup(){
>
> // Destroy resources.
> try{
> if( destination != NULL ) delete destination;
> }catch (CMSException& e) {}
> destination = NULL;
>
> try{
> if( consumer != NULL ) delete consumer;
> }catch (CMSException& e) {}
> consumer = NULL;
>
> // Close open resources.
> try{
> if( session != NULL ) session->close();
> if( connection != NULL ) connection->close();
> }catch (CMSException& e) {}
>
> try{
> if( session != NULL ) delete session;
> }catch (CMSException& e) {}
> session = NULL;
>
> try{
> if( connection != NULL ) delete connection;
> }catch (CMSException& e) {}
> connection = NULL;
> }
> };
> void Produce()
> {
> HelloWorldProducer producer( 2 );
> Thread producerThread( &producer );
> producerThread.start();
> producerThread.join();
> }
> void Consumer1()
> {
> HelloWorldConsumer consumer( 10000 );
> Thread consumerThread( &consumer );
> consumerThread.start();
> consumerThread.join();
> }
> void Consumer2()
> {
> HelloWorldConsumer consumer( 10000 );
> Thread consumerThread( &consumer );
> consumerThread.start();
> consumerThread.join();
> }
> int main(int argc, char* argv[])
> {
> Produce();
> Consumer1();
> Consumer2();
> }
>
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7485492
Sent from the ActiveMQ - User mailing list archive at Nabble.com.