You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by sgliu <sh...@sina.com> on 2006/11/26 06:03:30 UTC

Message's live time

I konw the function Message::setCMSExpiration(long).I think it is live time
of message.
But follow code,10 second later,I receive message yet.
...
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
destination = session->createTopic( "mytopic" );
...
producer->setDeliveryMode( DeliveryMode::PERSISTANT );
...
string text = "Hello world!"
TextMessage* message = session->createTextMessage( text );
message->setCMSExpiration(10000);
producer->send( message );
delete message;

Why?
-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Message's live time

Posted by Adrian Co <ac...@exist.com>.
Try:

producer->setTimeToLive(10000); //10 second



sgliu wrote:
> Please help me.
>
> sgliu wrote:
>   
>> I change to follow code.
>> Sorry.
>> 10 second later,message doesn't disappear.
>>
>> #include <time.h>
>> double getcurt()
>> {
>> 	time_t ltime;
>> 	time(&ltime);
>> 	double ddd;
>> 	ddd=ltime*1000; //second to millisecond
>> 	return ddd;
>> }
>> ...
>> double ttt=getcurt();
>> message->setCMSTimeStamp(ttt);
>> message->setCMSExpiration(ttt + 10000); //10 second
>> ...
>> follow code do not yet.
>> ...
>> double ttt=getcurt();
>> message->setCMSTimeStamp(ttt);
>> producer->setTimeToLive(ttt + 10000); //10 second
>> ...
>> I use visual c++ 2005
>>
>>
>>
>> nmittler wrote:
>>     
>>> Just to clear that up, I believe if you set TimeToLive to Tnow +
>>> timeToLive
>>> in the MessageProducer before sending a message, you will have the
>>> expiration time you're looking for.
>>>
>>> So something like this ...
>>>
>>> long Tnow = ... // whatever is appropriate for your compiler/OS ... just
>>> get
>>> the current time in milliseconds
>>> long timeToLive = 5000L; // 5 milliseconds
>>> producer->setTimeToLive(Tnow + timeToLive );
>>> ...
>>> producer->send(msg);
>>>
>>>
>>> Again, this is just a temporary work around.  We should have this fixed
>>> in
>>> trunk soon.
>>>
>>> Regards,
>>> Nate
>>>
>>> On 11/26/06, Nathan Mittler <na...@gmail.com> wrote:
>>>       
>>>> I just took a look at the code and it looks like the
>>>> ActiveMQProducer.sendmethod is overwriting the CMSExpiration in the
>>>> message with its timeToLive
>>>> value.  This is incorrect - it should set the expiry to (expiry +
>>>> timeToLive).  I've captured this in a JIRA issue:
>>>> https://issues.apache.org/activemq/browse/AMQCPP-14
>>>>
>>>> In the mean time, the CMS Expiration and TimeToLive serve basically the
>>>> same purpose.  I'm not sure exactly what results you're looking for, but
>>>> you
>>>> should be able to do essentially the same thing by using the TimeToLive
>>>> value in the ActiveMQProducer.
>>>>
>>>> Regards,
>>>> Nate
>>>>
>>>> On 11/26/06, sgliu <sh...@sina.com> wrote:
>>>>         
>>>>> I konw the function Message::setCMSExpiration(long).I think it is live
>>>>> time
>>>>> of message.
>>>>> But follow code,10 second later,I receive message yet.
>>>>> ...
>>>>> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>>>>> destination = session->createTopic( "mytopic" );
>>>>> ...
>>>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>>> ...
>>>>> string text = "Hello world!"
>>>>> TextMessage* message = session->createTextMessage( text );
>>>>> message->setCMSExpiration(10000);
>>>>> producer->send( message );
>>>>> delete message;
>>>>>
>>>>> Why?
>>>>> --
>>>>> View this message in context:
>>>>> http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
>>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>>           
>>>       
>>     
>
>   


Re: Message's live time

Posted by Nathan Mittler <na...@gmail.com>.
Just so you know I haven't forgotten about you :) ...

I have replicated the problem and I believe it is an issue with the broker.
I'm going to dig into it more later.


On 11/30/06, sgliu <sh...@sina.com> wrote:
>
>
> Sorry,according to your indication,message doesn't disappear.
> I hope:
> send first,a few time later,I receive nothing. (a few time later,message
> will disappear itself.)
> follow source code:
> #include <activemq/concurrent/Thread.h>
> #include <activemq/concurrent/Runnable.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Integer.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
>
> using namespace activemq::core;
> using namespace activemq::util;
> using namespace activemq::concurrent;
> using namespace cms;
> using namespace std;
>
> #include <sys/timeb.h>
> unsigned long getcurt()
> {
>         struct timeb t;
>         ftime (&t);
>         unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
>         return timeStamp;
> }
>
> class HelloWorldProducer : public Runnable {
> private:
>
>         Connection* connection;
>         Session* session;
>         Topic* destination;
>         MessageProducer* producer;
>         int numMessages;
>
> public:
>
>         HelloWorldProducer( int numMessages ){
>                 connection = NULL;
>         session = NULL;
>         destination = NULL;
>         producer = NULL;
>         this->numMessages = numMessages;
>         }
>
>         virtual ~HelloWorldProducer(){
>                 cleanup();
>         }
>
>     virtual void run() {
>         try {
>                         string user,passwd,sID;
>                         user="default";
>                         passwd="";
>                         sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>
>             // Create a Connection
>             connection =
> connectionFactory->createConnection(user,passwd,sID);
>             connection->start();
>
>                         string sss=connection->getClientId();
>                         cout << sss << endl;
>
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>                         destination = session->createTopic( "mytopic" );
>
>                         producer = session->createProducer( destination );
>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>
>                         unsigned long ttt=getcurt();
>                         producer->setTimeToLive( 10000);
>
>             // Create the Thread Id String
>             string threadIdStr = Integer::toString( Thread::getId() );
>
>             // Create a messages
>             string text = (string)"Hello world! from thread " +
> threadIdStr;
>
>             for( int ix=0; ix<numMessages; ++ix ){
>                     TextMessage* message = session->createTextMessage(
> text );
>
>         //                      message->setCMSTimeStamp(ttt);
>         //                      message->setCMSExpiration(ttt +
> 10000);         //消息到期时间
>         //                      string messageID="messageID";
>
>         //                      message->setCMSMessageId(messageID);            //消息ID
>                         //      producer->setTimeToLive(10000);
>
>
>                 // Tell the producer to send the message
>                     printf( "Sent message from thread %s\n",
> threadIdStr.c_str() );
>                         producer->send( message );
>
>                 delete message;
>             }
>
>         }catch ( CMSException& e ) {
>             e.printStackTrace();
>         }
>     }
>
> private:
>
>     void cleanup(){
>
>                         // Destroy resources.
>                         try{
>                 if( destination != NULL ) delete destination;
>                         }catch ( CMSException& e ) {}
>                         destination = NULL;
>
>                         try{
>                     if( producer != NULL ) delete producer;
>                         }catch ( CMSException& e ) {}
>                         producer = NULL;
>
>                 // Close open resources.
>                 try{
>                         if( session != NULL ) session->close();
>                         if( connection != NULL ) connection->close();
>                         }catch ( CMSException& e ) {}
>
>                         try{
>                 if( session != NULL ) delete session;
>                         }catch ( CMSException& e ) {}
>                         session = NULL;
>
>             try{
>                 if( connection != NULL ) delete connection;
>                         }catch ( CMSException& e ) {}
>                 connection = NULL;
>     }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
>                            public MessageListener,
>                            public Runnable {
>
> private:
>
>         Connection* connection;
>         Session* session;
>         Topic* destination;
>         MessageConsumer* consumer;
>         long waitMillis;
>
> public:
>
>         HelloWorldConsumer( long waitMillis ){
>                 connection = NULL;
>         session = NULL;
>         destination = NULL;
>         consumer = NULL;
>         this->waitMillis = waitMillis;
>         }
>     virtual ~HelloWorldConsumer(){
>         cleanup();
>     }
>
>     virtual void run() {
>
>         try {
>
>                         string user,passwd,sID;
>                         user="default";
>                         passwd="";
>                         sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory =
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
>
>             // Create a Connection
>             connection =
> connectionFactory->createConnection();//user,passwd,sID);
>             delete connectionFactory;
>             connection->start();
>
>             connection->setExceptionListener(this);
>
>             // Create a Session
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>
>             // Create the destination (Topic or Queue)
>                         destination = session->createTopic(
> "mytopic?consumer.retroactive=true"
> );
>
>                         consumer = session->createDurableConsumer(
> destination , user ,
> "",false);
>
>             consumer->setMessageListener( this );
>
>             // Sleep while asynchronous messages come in.
>             Thread::sleep( waitMillis );
>
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onMessage( const Message* message ){
>
>         try
>         {
>             const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             string text = textMessage->getText();
>             printf( "Received: %s\n", text.c_str() );
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onException( const CMSException& ex ) {
>         printf("JMS Exception occured.  Shutting down client.\n");
>     }
>
> private:
>
>     void cleanup(){
>
>                 // Destroy resources.
>                 try{
>                 if( destination != NULL ) delete destination;
>                 }catch (CMSException& e) {}
>                 destination = NULL;
>
>                 try{
>             if( consumer != NULL ) delete consumer;
>                 }catch (CMSException& e) {}
>                 consumer = NULL;
>
>                 // Close open resources.
>                 try{
>                         if( session != NULL ) session->close();
>                         if( connection != NULL ) connection->close();
>                 }catch (CMSException& e) {}
>
>         try{
>                 if( session != NULL ) delete session;
>                 }catch (CMSException& e) {}
>                 session = NULL;
>
>                 try{
>                 if( connection != NULL ) delete connection;
>                 }catch (CMSException& e) {}
>                 connection = NULL;
>     }
> };
>   void Produce()
> {
>      HelloWorldProducer producer( 2 );
>      Thread producerThread( &producer );
>      producerThread.start();
>      producerThread.join();
> }
> void Consumer()
> {
>       HelloWorldConsumer consumer( 5000 );
>       Thread consumerThread( &consumer );
>       consumerThread.start();
>       consumerThread.join();
> }
> int main(int argc, char* argv[])
> {
>    //   Produce();
>
>           cout << "Produce End." << endl;
>
>       Consumer();
> }
> ----------------------------------------
> I made follow two experiments:
> 1.  In HelloWorldProducer,add follow code,but 11 seconds later, I receive
> message yet.
> producer->setTimeToLive( 10000);
>
> 2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
> message yet.
> unsigned long ttt=getcurt();
> producer->setTimeToLive( ttt + 10000);
>
> Each time, at first,I have commented out the line:Consumer() in main
> function,and run program.
> Second,wait 20 seconds.
> Third,I have commented out the line:Produce() in main function,and run
> program.
> Both two experiment,I have received message.
>
> That message don't disappear itself.
> Why?
> Please help me.
> --
> View this message in context:
> http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Re: Message's live time

Posted by sgliu <sh...@sina.com>.
Sorry,according to your indication,message doesn't disappear.
I hope: 
send first,a few time later,I receive nothing. (a few time later,message
will disappear itself.) 
follow source code:
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>

using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

#include <sys/timeb.h>
unsigned long getcurt()
{
	struct timeb t;
	ftime (&t);
	unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
	return timeStamp;
}

class HelloWorldProducer : public Runnable {
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageProducer* producer;
	int numMessages;

public:
	
	HelloWorldProducer( int numMessages ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	producer = NULL;
    	this->numMessages = numMessages;
	}
	
	virtual ~HelloWorldProducer(){
		cleanup();
	}
	
    virtual void run() {
        try {
			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection(user,passwd,sID);
            connection->start();

			string sss=connection->getClientId();
			cout << sss << endl;

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
			destination = session->createTopic( "mytopic" );

			producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::PERSISTANT );
            
			unsigned long ttt=getcurt();
			producer->setTimeToLive( 10000);

            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );
            
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
            
            for( int ix=0; ix<numMessages; ++ix ){
	            TextMessage* message = session->createTextMessage( text );

	//			message->setCMSTimeStamp(ttt);
	//			message->setCMSExpiration(ttt + 10000);		//消息到期时间
	//			string messageID="messageID";
	//			message->setCMSMessageId(messageID);		//消息ID
			//	producer->setTimeToLive(10000);


    	        // Tell the producer to send the message
        	    printf( "Sent message from thread %s\n", threadIdStr.c_str() );
        		producer->send( message );
            	
            	delete message;
            }
			
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
    
private:

    void cleanup(){
    				
			// Destroy resources.
			try{                        
            	if( destination != NULL ) delete destination;
			}catch ( CMSException& e ) {}
			destination = NULL;
			
			try{
	            if( producer != NULL ) delete producer;
			}catch ( CMSException& e ) {}
			producer = NULL;
			
    		// Close open resources.
    		try{
    			if( session != NULL ) session->close();
    			if( connection != NULL ) connection->close();
			}catch ( CMSException& e ) {}

			try{
            	if( session != NULL ) delete session;
			}catch ( CMSException& e ) {}
			session = NULL;
			
            try{
            	if( connection != NULL ) delete connection;
			}catch ( CMSException& e ) {}
    		connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener,
                           public Runnable {
	
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageConsumer* consumer;
	long waitMillis;
		
public: 

	HelloWorldConsumer( long waitMillis ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	consumer = NULL;
    	this->waitMillis = waitMillis;
	}
    virtual ~HelloWorldConsumer(){    	
    	cleanup();
    }
    
    virtual void run() {
    	    	
        try {

			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection();//user,passwd,sID);
            delete connectionFactory;
            connection->start();
            
            connection->setExceptionListener(this);

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);

            // Create the destination (Topic or Queue)
			destination = session->createTopic( "mytopic?consumer.retroactive=true"
);

			consumer = session->createDurableConsumer( destination , user ,
"",false);
            
            consumer->setMessageListener( this );
            
            // Sleep while asynchronous messages come in.
            Thread::sleep( waitMillis );		
            
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    
    virtual void onMessage( const Message* message ){
    	
        try
        {
    	    const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }
    
private:

    void cleanup(){
    	
		// Destroy resources.
		try{                        
        	if( destination != NULL ) delete destination;
		}catch (CMSException& e) {}
		destination = NULL;
		
		try{
            if( consumer != NULL ) delete consumer;
		}catch (CMSException& e) {}
		consumer = NULL;
		
		// Close open resources.
		try{
			if( session != NULL ) session->close();
			if( connection != NULL ) connection->close();
		}catch (CMSException& e) {}
		
        try{
        	if( session != NULL ) delete session;
		}catch (CMSException& e) {}
		session = NULL;
		
		try{
        	if( connection != NULL ) delete connection;
		}catch (CMSException& e) {}
		connection = NULL;
    }
};
  void Produce() 
 { 
     HelloWorldProducer producer( 2 ); 
     Thread producerThread( &producer ); 
     producerThread.start(); 
     producerThread.join(); 
 } 
 void Consumer() 
 { 
      HelloWorldConsumer consumer( 5000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 int main(int argc, char* argv[]) 
 { 
   //   Produce(); 

	  cout << "Produce End." << endl;

      Consumer(); 
 }
----------------------------------------
I made follow two experiments:
1.  In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
producer->setTimeToLive( 10000);

2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
unsigned long ttt=getcurt();
producer->setTimeToLive( ttt + 10000);

Each time, at first,I have commented out the line:Consumer() in main
function,and run program.
Second,wait 20 seconds.
Third,I have commented out the line:Produce() in main function,and run
program.
Both two experiment,I have received message.

That message don't disappear itself.
Why? 
Please help me.
-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Message's live time

Posted by Nathan Mittler <na...@gmail.com>.
1) In the producer code, you should be setting the TimeToLive (the line you
have commented out), not the Expiration in the message directly.  TTL =
currentTime + 1000, for example.  Remember, this is just a temporary
work-around, in the future it will be TTL = 1000, which will match the JMS
API.  Expiration is a field in the message that is set by the JMS provider.
When you send a message, the MessageProducer will overwrite your value for
Expiration with what it thinks it should be (currently the time to live
value).
2) Since you're using an asynchronous consumer your onMessage method will be
called back asynchronously ... as soon as the message is received on the
wire.  I think the best bet would be to use the current time as your
TimeToLive (TTL) value.  The odds of your message being sent to the broker,
processed, and then sent back to the consumer in a single millisecond are
pretty small.  If you still receive the message, it may be an issue with the
broker.

On 11/29/06, sgliu <sh...@sina.com> wrote:
>
>
> In Producer,follow code:
>
> ...
> #include <sys/timeb.h>
> unsigned long getcurt()
> {
>         struct timeb t;
>         ftime (&t);
>         unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
>         return timeStamp;
> }
> ...
> destination = session->createTopic( "mytopic" );
> producer = session->createProducer( destination );
> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
> ...
> unsigned long ttt=getcurt();
> message->setCMSTimeStamp(ttt);
> message->setCMSExpiration(ttt + 10000);
> //producer->setTimeToLive(10000);
> producer->send( message );
> ...
>
> In Consumer ,follow code:
> ...
> ... ...
>     virtual void run() {
>
>         try {
>
>                         string user,passwd,sID;
>                         user="default";
>                         passwd="";
>                         sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory =
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
>
>             // Create a Connection
>             connection =
> connectionFactory->createConnection();//user,passwd,sID);
>             delete connectionFactory;
>             connection->start();
>
>             connection->setExceptionListener(this);
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>                         destination = session->createTopic(
> "mytopic?consumer.retroactive=true"
> );
>
>                         consumer = session->createDurableConsumer(
> destination , user ,
> "",false);
>
>             consumer->setMessageListener( this );
>             Thread::sleep( waitMillis );
>
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onMessage( const Message* message ){
>
>         try
>         {
>             const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             string text = textMessage->getText();
>             printf( "Received: %s\n", text.c_str() );
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onException( const CMSException& ex ) {
>         printf("JMS Exception occured.  Shutting down client.\n");
>     }
> ... ...
>
> I hope:
> send first,a few time later,I receive nothing.
> (a few time later,message will disappear itself.)
>
> (I use visual C++ 2005)
> Please help me.
>
>
>
> I guess I'm not sure exactly what you're trying to do.  I don't see a
> consumer in the code snippets.  Is your consumer a C++ or Java client?
> Are you using an asynchronous consumer through the MessageListener
> interface, or are you calling receive() after a timed wait?
>
> --
> View this message in context:
> http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7597066
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

RE: Message's live time

Posted by sgliu <sh...@sina.com>.
In Producer,follow code:

...
#include <sys/timeb.h>
unsigned long getcurt()
{
	struct timeb t;
	ftime (&t);
	unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
	return timeStamp;
}
...
destination = session->createTopic( "mytopic" );
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::PERSISTANT );
...
unsigned long ttt=getcurt();
message->setCMSTimeStamp(ttt);
message->setCMSExpiration(ttt + 10000);
//producer->setTimeToLive(10000);
producer->send( message );
...

In Consumer ,follow code:
...
... ...
    virtual void run() {
    	    	
        try {

			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection();//user,passwd,sID);
            delete connectionFactory;
            connection->start();
            
            connection->setExceptionListener(this);
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
			destination = session->createTopic( "mytopic?consumer.retroactive=true"
);

			consumer = session->createDurableConsumer( destination , user ,
"",false);
            
            consumer->setMessageListener( this );
            Thread::sleep( waitMillis );		
            
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    
    virtual void onMessage( const Message* message ){
    	
        try
        {
    	    const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }
... ...

I hope:
send first,a few time later,I receive nothing.
(a few time later,message will disappear itself.)

(I use visual C++ 2005)
Please help me.



I guess I'm not sure exactly what you're trying to do.  I don't see a
consumer in the code snippets.  Is your consumer a C++ or Java client?
Are you using an asynchronous consumer through the MessageListener
interface, or are you calling receive() after a timed wait?

-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7597066
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Message's live time

Posted by sgliu <sh...@sina.com>.
Please help me.

sgliu wrote:
> 
> I change to follow code.
> Sorry.
> 10 second later,message doesn't disappear.
> 
> #include <time.h>
> double getcurt()
> {
> 	time_t ltime;
> 	time(&ltime);
> 	double ddd;
> 	ddd=ltime*1000; //second to millisecond
> 	return ddd;
> }
> ...
> double ttt=getcurt();
> message->setCMSTimeStamp(ttt);
> message->setCMSExpiration(ttt + 10000); //10 second
> ...
> follow code do not yet.
> ...
> double ttt=getcurt();
> message->setCMSTimeStamp(ttt);
> producer->setTimeToLive(ttt + 10000); //10 second
> ...
> I use visual c++ 2005
> 
> 
> 
> nmittler wrote:
>> 
>> Just to clear that up, I believe if you set TimeToLive to Tnow +
>> timeToLive
>> in the MessageProducer before sending a message, you will have the
>> expiration time you're looking for.
>> 
>> So something like this ...
>> 
>> long Tnow = ... // whatever is appropriate for your compiler/OS ... just
>> get
>> the current time in milliseconds
>> long timeToLive = 5000L; // 5 milliseconds
>> producer->setTimeToLive(Tnow + timeToLive );
>> ...
>> producer->send(msg);
>> 
>> 
>> Again, this is just a temporary work around.  We should have this fixed
>> in
>> trunk soon.
>> 
>> Regards,
>> Nate
>> 
>> On 11/26/06, Nathan Mittler <na...@gmail.com> wrote:
>>>
>>> I just took a look at the code and it looks like the
>>> ActiveMQProducer.sendmethod is overwriting the CMSExpiration in the
>>> message with its timeToLive
>>> value.  This is incorrect - it should set the expiry to (expiry +
>>> timeToLive).  I've captured this in a JIRA issue:
>>> https://issues.apache.org/activemq/browse/AMQCPP-14
>>>
>>> In the mean time, the CMS Expiration and TimeToLive serve basically the
>>> same purpose.  I'm not sure exactly what results you're looking for, but
>>> you
>>> should be able to do essentially the same thing by using the TimeToLive
>>> value in the ActiveMQProducer.
>>>
>>> Regards,
>>> Nate
>>>
>>> On 11/26/06, sgliu <sh...@sina.com> wrote:
>>> >
>>> >
>>> > I konw the function Message::setCMSExpiration(long).I think it is live
>>> > time
>>> > of message.
>>> > But follow code,10 second later,I receive message yet.
>>> > ...
>>> > session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>>> > destination = session->createTopic( "mytopic" );
>>> > ...
>>> > producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>> > ...
>>> > string text = "Hello world!"
>>> > TextMessage* message = session->createTextMessage( text );
>>> > message->setCMSExpiration(10000);
>>> > producer->send( message );
>>> > delete message;
>>> >
>>> > Why?
>>> > --
>>> > View this message in context:
>>> > http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
>>> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >
>>> >
>>>
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7570864
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Message's live time

Posted by sgliu <sh...@sina.com>.
I see.Thank you very much.


nmittler wrote:
> 
> Just to clear that up, I believe if you set TimeToLive to Tnow +
> timeToLive
> in the MessageProducer before sending a message, you will have the
> expiration time you're looking for.
> 
> So something like this ...
> 
> long Tnow = ... // whatever is appropriate for your compiler/OS ... just
> get
> the current time in milliseconds
> long timeToLive = 5000L; // 5 milliseconds
> producer->setTimeToLive(Tnow + timeToLive );
> ...
> producer->send(msg);
> 
> 
> Again, this is just a temporary work around.  We should have this fixed in
> trunk soon.
> 
> Regards,
> Nate
> 
> On 11/26/06, Nathan Mittler <na...@gmail.com> wrote:
>>
>> I just took a look at the code and it looks like the
>> ActiveMQProducer.sendmethod is overwriting the CMSExpiration in the
>> message with its timeToLive
>> value.  This is incorrect - it should set the expiry to (expiry +
>> timeToLive).  I've captured this in a JIRA issue:
>> https://issues.apache.org/activemq/browse/AMQCPP-14
>>
>> In the mean time, the CMS Expiration and TimeToLive serve basically the
>> same purpose.  I'm not sure exactly what results you're looking for, but
>> you
>> should be able to do essentially the same thing by using the TimeToLive
>> value in the ActiveMQProducer.
>>
>> Regards,
>> Nate
>>
>> On 11/26/06, sgliu <sh...@sina.com> wrote:
>> >
>> >
>> > I konw the function Message::setCMSExpiration(long).I think it is live
>> > time
>> > of message.
>> > But follow code,10 second later,I receive message yet.
>> > ...
>> > session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>> > destination = session->createTopic( "mytopic" );
>> > ...
>> > producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> > ...
>> > string text = "Hello world!"
>> > TextMessage* message = session->createTextMessage( text );
>> > message->setCMSExpiration(10000);
>> > producer->send( message );
>> > delete message;
>> >
>> > Why?
>> > --
>> > View this message in context:
>> > http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
>> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >
>> >
>>
> 
> 

-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7555914
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Message's live time

Posted by Nathan Mittler <na...@gmail.com>.
Just to clear that up, I believe if you set TimeToLive to Tnow + timeToLive
in the MessageProducer before sending a message, you will have the
expiration time you're looking for.

So something like this ...

long Tnow = ... // whatever is appropriate for your compiler/OS ... just get
the current time in milliseconds
long timeToLive = 5000L; // 5 milliseconds
producer->setTimeToLive(Tnow + timeToLive );
...
producer->send(msg);


Again, this is just a temporary work around.  We should have this fixed in
trunk soon.

Regards,
Nate

On 11/26/06, Nathan Mittler <na...@gmail.com> wrote:
>
> I just took a look at the code and it looks like the ActiveMQProducer.sendmethod is overwriting the CMSExpiration in the message with its timeToLive
> value.  This is incorrect - it should set the expiry to (expiry +
> timeToLive).  I've captured this in a JIRA issue:
> https://issues.apache.org/activemq/browse/AMQCPP-14
>
> In the mean time, the CMS Expiration and TimeToLive serve basically the
> same purpose.  I'm not sure exactly what results you're looking for, but you
> should be able to do essentially the same thing by using the TimeToLive
> value in the ActiveMQProducer.
>
> Regards,
> Nate
>
> On 11/26/06, sgliu <sh...@sina.com> wrote:
> >
> >
> > I konw the function Message::setCMSExpiration(long).I think it is live
> > time
> > of message.
> > But follow code,10 second later,I receive message yet.
> > ...
> > session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> > destination = session->createTopic( "mytopic" );
> > ...
> > producer->setDeliveryMode( DeliveryMode::PERSISTANT );
> > ...
> > string text = "Hello world!"
> > TextMessage* message = session->createTextMessage( text );
> > message->setCMSExpiration(10000);
> > producer->send( message );
> > delete message;
> >
> > Why?
> > --
> > View this message in context:
> > http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >
> >
>

Re: Message's live time

Posted by Nathan Mittler <na...@gmail.com>.
I just took a look at the code and it looks like the
ActiveMQProducer.sendmethod is overwriting the CMSExpiration in the
message with its timeToLive
value.  This is incorrect - it should set the expiry to (expiry +
timeToLive).  I've captured this in a JIRA issue:
https://issues.apache.org/activemq/browse/AMQCPP-14

In the mean time, the CMS Expiration and TimeToLive serve basically the same
purpose.  I'm not sure exactly what results you're looking for, but you
should be able to do essentially the same thing by using the TimeToLive
value in the ActiveMQProducer.

Regards,
Nate

On 11/26/06, sgliu <sh...@sina.com> wrote:
>
>
> I konw the function Message::setCMSExpiration(long).I think it is live
> time
> of message.
> But follow code,10 second later,I receive message yet.
> ...
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> destination = session->createTopic( "mytopic" );
> ...
> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
> ...
> string text = "Hello world!"
> TextMessage* message = session->createTextMessage( text );
> message->setCMSExpiration(10000);
> producer->send( message );
> delete message;
>
> Why?
> --
> View this message in context:
> http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7544897
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>