You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Andrew M <an...@oc384.net> on 2008/03/18 15:20:53 UTC

Retroactive consumer...yes, no, maybe so?

Does anyone have an example of how to make a retroactive consumer work?
Thanks,
Andrew


-----Original Message-----
From: Andrew M [mailto:andrew@oc384.net] 
Sent: Friday, March 07, 2008 5:05 PM
To: users@activemq.apache.org
Subject: RE: Retroactive consumer not working...

Does anyone have an example of working retroactive consumer code?

Thanks,
Andrew




-----Original Message-----
From: Andrew [mailto:andrew@oc384.net] 
Sent: Wednesday, March 05, 2008 2:40 PM
To: users@activemq.apache.org
Subject: Retroactive consumer not working...

My broker is not feeding my consumer the messages from the retroactive queue
when the consumer connects.  The producer puts in a 10 min (600000ms) TTL so
I would think when my consumer reconnects it should receive the last 10 mins
of msgs.  Otherwise things appear fine, new msgs are received, etc... any
ideas?


On the producer.......

    private Session getActiveMqSession() throws JMSException {
        String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
ACTIVE_MQ_PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
        connection = connectionFactory.createConnection();
        ((ActiveMQConnection)connection).setUseAsyncSend(false);
        connection.start();
        return connection.createSession(false, Session.SESSION_TRANSACTED); 
    }
    
    Session session = getActiveMqSession();
    
    void send(Object a) throws blah blah blah {
        Destination destination = session.createQueue(consumerName);
        producer = session.createProducer(destination);
        ObjectMessage m = session.createObjectMessage();
        m.setObject(a);
        //10 min TTL
        ((ActiveMQMessageProducer)producer).send(m, DeliveryMode.PERSISTENT,
Message.DEFAULT_PRIORITY, 600000L); 
    }
    

...and on the Consumer...  
  
    Session session;
    
    public void run() {
    	String url =
"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
ectAttempts=0";
	ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
      Connection connection = connectionFactory.createConnection();
      connection.start();
      connection.setExceptionListener(this);
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   }
		
   public void subscribe(String destName, MessageListener l) throws
JMSException {
      destName = destName + "?consumer.retroactive=true";
      MessageConsumer mc =
session.createConsumer(session.createQueue(destName));
      mc.setMessageListener(l);
    }
                


                    
                      

 
-----Original Message-----
From: Andrew [mailto:andrew@oc384.net] 
Sent: Wednesday, March 05, 2008 9:49 AM
To: users@activemq.apache.org
Subject: RE: purging an ActiveMQ queue

Thanks I'll create it as retroactive.  Problem is, how can I tell the broker
to purge the msgs at midnight so I get only msgs from the current day?
Would I just schedule a messageConsumer.close() ?

Thanks,
Andrew


-----Original Message-----
From: James Strachan [mailto:james.strachan@gmail.com] 
Sent: Wednesday, March 05, 2008 12:17 AM
To: users@activemq.apache.org
Subject: Re: purging an ActiveMQ queue

On 04/03/2008, Andrew <an...@oc384.net> wrote:
> I would like to set up a topic or queue so that when a client connects to
>  the broker and subscribes to that topic he receives a refresh of all the
>  messages sent so far on that topic that day.  At midnight the broker
should
>  purge the day's messages.  What's the best way to do that?

For queues, messages stay on a queue until they expire. For topics,
normally you only get messages sent after you start subscribing
(unless you use persistent topic subscriptions) though ActiveMQ
supports Subscription Recovery for this kinda stuff...

http://activemq.apache.org/subscription-recovery-policy.html

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com





activemq Command Line Tools missing?

Posted by Andrew M <an...@oc384.net>.
According to this:

 http://activemq.apache.org/activemq-command-line-tools-reference.html

there should be some batch files and shell scripts like list, shutdown,
query, etc... They appear to be missing from the 4.1.1 and 5.0.0 source and
bin archives.

I'm actually looking for a quick way to publish strings to a topic/queue
from the command line.  I could write a little java app that takes a URL and
a String as args... but is there something that already exists for this?

Thanks...




RE: Retroactive consumer...yes, no, maybe so?

Posted by Andrew M <an...@oc384.net>.
It seems that I'm getting the last messages but I'm not getting all of them
for the day.  I probably need to set some Recovery Policy options...
  http://activemq.apache.org/subscription-recovery-policy.html 
Can I do that in my java code instead of the XML?  What would the URL look
like?  I want to configure everything through java.  Are the available
Recovery Policies mutually exclusive or can multiple policies be applied
simultaneously?

Also, when using a retroactive consumer it would make sense to register the
listener before I call session.createConsumer but until the consumer is
instantiated, how can I set the listener?  A Catch 22?  This is my
consumer's method...  any suggestions to improve it?

    public void subscribe(String destName, MessageListener l) throws
JMSException {
        char c = destName.contains("?")?'&':'?';
        destName = destName + c + "consumer.retroactive=true";
        System.out.println("ActiveMqClient subscribe " + destName); 
        MessageConsumer mc = session.createConsumer(
session.createTopic(destName) );
        mc.setMessageListener(l);
    }

Thanks..
Andrew


-----Original Message-----
From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
Mulder
Sent: Tuesday, March 18, 2008 12:54 PM
To: users@activemq.apache.org
Subject: Re: Retroactive consumer...yes, no, maybe so?

Are you sure destName doesn't already have some "?options" in it?  Are
you sure no other consumer gets the messages off the queue before your
retroactive consumer?  Are you sure you shouldn't set the message
listener on the message consumer before registering it as a consumer?

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 1:19 PM, Andrew M <an...@oc384.net> wrote:
> Aaron,
>  My original producer and consumer code are at the bottom of the msg.
Note
>  the subscribe method appends retroactive=true.
>  Thanks for any suggestions you may have..
>  Andrew
>
>
>
>  -----Original Message-----
>  From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
>  Mulder
>  Sent: Tuesday, March 18, 2008 10:07 AM
>  To: users@activemq.apache.org
>
>
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
>  Do you want to post your example that's *not* working?  I last used
>  retroactive consumers probably 18 months ago, and they worked fine at
>  that time.  I was doing a network of brokers with fail-over, and if I
>  took one broker down and caused a consumer to fail over, it missed
>  messages during the fail-over operation.  With retroactive consumer
>  enabled, it didn't miss any messages (but got some duplicates) once it
>  failed over.  I don't have that code/configuration at hand, though --
>  just this from my notes:
>
>  topic = new
ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
>  And I used this to set the retroactive consumers to receive the last
>  30s worth of messages, instead of the default (which I think at the
>  time was last 100):
>
>  <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
>  </broker>
>
>  http://www.activemq.org/site/retroactive-consumer.html
>  http://www.activemq.org/site/subscription-recovery-policy.html
>
>  Thanks,
>        Aaron
>
>  On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <an...@oc384.net> wrote:
>
>  >
>
>
> >  -----Original Message-----
>  >  From: Andrew [mailto:andrew@oc384.net]
>  >  Sent: Wednesday, March 05, 2008 2:40 PM
>  >  To: users@activemq.apache.org
>  >  Subject: Retroactive consumer not working...
>  >
>  >  My broker is not feeding my consumer the messages from the retroactive
>  queue
>  >  when the consumer connects.  The producer puts in a 10 min (600000ms)
TTL
>  so
>  >  I would think when my consumer reconnects it should receive the last
10
>  mins
>  >  of msgs.  Otherwise things appear fine, new msgs are received, etc...
any
>  >  ideas?
>  >
>  >
>  >  On the producer.......
>  >
>  >     private Session getActiveMqSession() throws JMSException {
>  >         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  >  ACTIVE_MQ_PORT +
>  >  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >         connection = connectionFactory.createConnection();
>  >         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>  >         connection.start();
>  >         return connection.createSession(false,
>  Session.SESSION_TRANSACTED);
>  >     }
>  >
>  >     Session session = getActiveMqSession();
>  >
>  >     void send(Object a) throws blah blah blah {
>  >         Destination destination = session.createQueue(consumerName);
>  >         producer = session.createProducer(destination);
>  >         ObjectMessage m = session.createObjectMessage();
>  >         m.setObject(a);
>  >         //10 min TTL
>  >         ((ActiveMQMessageProducer)producer).send(m,
>  DeliveryMode.PERSISTENT,
>  >  Message.DEFAULT_PRIORITY, 600000L);
>  >     }
>  >
>  >
>  >  ...and on the Consumer...
>  >
>  >     Session session;
>  >
>  >     public void run() {
>  >         String url =
>  >
>
"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  >  ectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >       Connection connection = connectionFactory.createConnection();
>  >       connection.start();
>  >       connection.setExceptionListener(this);
>  >       session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
>  >    }
>  >
>  >    public void subscribe(String destName, MessageListener l) throws
>  >  JMSException {
>  >       destName = destName + "?consumer.retroactive=true";
>  >       MessageConsumer mc =
>  >  session.createConsumer(session.createQueue(destName));
>  >       mc.setMessageListener(l);
>  >     }
>  >
>  >
>
>
>
>


Re: Retroactive consumer...yes, no, maybe so?

Posted by Aaron Mulder <am...@alumni.princeton.edu>.
Are you sure destName doesn't already have some "?options" in it?  Are
you sure no other consumer gets the messages off the queue before your
retroactive consumer?  Are you sure you shouldn't set the message
listener on the message consumer before registering it as a consumer?

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 1:19 PM, Andrew M <an...@oc384.net> wrote:
> Aaron,
>  My original producer and consumer code are at the bottom of the msg.  Note
>  the subscribe method appends retroactive=true.
>  Thanks for any suggestions you may have..
>  Andrew
>
>
>
>  -----Original Message-----
>  From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
>  Mulder
>  Sent: Tuesday, March 18, 2008 10:07 AM
>  To: users@activemq.apache.org
>
>
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
>  Do you want to post your example that's *not* working?  I last used
>  retroactive consumers probably 18 months ago, and they worked fine at
>  that time.  I was doing a network of brokers with fail-over, and if I
>  took one broker down and caused a consumer to fail over, it missed
>  messages during the fail-over operation.  With retroactive consumer
>  enabled, it didn't miss any messages (but got some duplicates) once it
>  failed over.  I don't have that code/configuration at hand, though --
>  just this from my notes:
>
>  topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
>  And I used this to set the retroactive consumers to receive the last
>  30s worth of messages, instead of the default (which I think at the
>  time was last 100):
>
>  <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
>  </broker>
>
>  http://www.activemq.org/site/retroactive-consumer.html
>  http://www.activemq.org/site/subscription-recovery-policy.html
>
>  Thanks,
>        Aaron
>
>  On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <an...@oc384.net> wrote:
>
>  >
>
>
> >  -----Original Message-----
>  >  From: Andrew [mailto:andrew@oc384.net]
>  >  Sent: Wednesday, March 05, 2008 2:40 PM
>  >  To: users@activemq.apache.org
>  >  Subject: Retroactive consumer not working...
>  >
>  >  My broker is not feeding my consumer the messages from the retroactive
>  queue
>  >  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
>  so
>  >  I would think when my consumer reconnects it should receive the last 10
>  mins
>  >  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>  >  ideas?
>  >
>  >
>  >  On the producer.......
>  >
>  >     private Session getActiveMqSession() throws JMSException {
>  >         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  >  ACTIVE_MQ_PORT +
>  >  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >         connection = connectionFactory.createConnection();
>  >         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>  >         connection.start();
>  >         return connection.createSession(false,
>  Session.SESSION_TRANSACTED);
>  >     }
>  >
>  >     Session session = getActiveMqSession();
>  >
>  >     void send(Object a) throws blah blah blah {
>  >         Destination destination = session.createQueue(consumerName);
>  >         producer = session.createProducer(destination);
>  >         ObjectMessage m = session.createObjectMessage();
>  >         m.setObject(a);
>  >         //10 min TTL
>  >         ((ActiveMQMessageProducer)producer).send(m,
>  DeliveryMode.PERSISTENT,
>  >  Message.DEFAULT_PRIORITY, 600000L);
>  >     }
>  >
>  >
>  >  ...and on the Consumer...
>  >
>  >     Session session;
>  >
>  >     public void run() {
>  >         String url =
>  >
>  "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  >  ectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >       Connection connection = connectionFactory.createConnection();
>  >       connection.start();
>  >       connection.setExceptionListener(this);
>  >       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>  >    }
>  >
>  >    public void subscribe(String destName, MessageListener l) throws
>  >  JMSException {
>  >       destName = destName + "?consumer.retroactive=true";
>  >       MessageConsumer mc =
>  >  session.createConsumer(session.createQueue(destName));
>  >       mc.setMessageListener(l);
>  >     }
>  >
>  >
>
>
>
>

Re: Retroactive consumer...yes, no, maybe so?

Posted by Adrian Co <ac...@exist.com>.
As far as I know, retroactive consumers only applies to topics. For 
queues, you should get all messages whether the consumer is online or 
offline. The example in the documentation could be wrong.

Andrew M wrote:
> Aaron,
> My original producer and consumer code are at the bottom of the msg.  Note
> the subscribe method appends retroactive=true.  
> Thanks for any suggestions you may have..
> Andrew
>
>
> -----Original Message-----
> From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
> Mulder
> Sent: Tuesday, March 18, 2008 10:07 AM
> To: users@activemq.apache.org
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
> Do you want to post your example that's *not* working?  I last used
> retroactive consumers probably 18 months ago, and they worked fine at
> that time.  I was doing a network of brokers with fail-over, and if I
> took one broker down and caused a consumer to fail over, it missed
> messages during the fail-over operation.  With retroactive consumer
> enabled, it didn't miss any messages (but got some duplicates) once it
> failed over.  I don't have that code/configuration at hand, though --
> just this from my notes:
>
> topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
> And I used this to set the retroactive consumers to receive the last
> 30s worth of messages, instead of the default (which I think at the
> time was last 100):
>
> <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
> </broker>
>
> http://www.activemq.org/site/retroactive-consumer.html
> http://www.activemq.org/site/subscription-recovery-policy.html
>
> Thanks,
>        Aaron
>
> On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <an...@oc384.net> wrote:
>  
>   
>>  -----Original Message-----
>>  From: Andrew [mailto:andrew@oc384.net]
>>  Sent: Wednesday, March 05, 2008 2:40 PM
>>  To: users@activemq.apache.org
>>  Subject: Retroactive consumer not working...
>>
>>  My broker is not feeding my consumer the messages from the retroactive
>>     
> queue
>   
>>  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
>>     
> so
>   
>>  I would think when my consumer reconnects it should receive the last 10
>>     
> mins
>   
>>  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>>  ideas?
>>
>>
>>  On the producer.......
>>
>>     private Session getActiveMqSession() throws JMSException {
>>         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>>  ACTIVE_MQ_PORT +
>>  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>>         ActiveMQConnectionFactory connectionFactory = new
>>  ActiveMQConnectionFactory(url);
>>         connection = connectionFactory.createConnection();
>>         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>>         connection.start();
>>         return connection.createSession(false,
>>     
> Session.SESSION_TRANSACTED);
>   
>>     }
>>
>>     Session session = getActiveMqSession();
>>
>>     void send(Object a) throws blah blah blah {
>>         Destination destination = session.createQueue(consumerName);
>>         producer = session.createProducer(destination);
>>         ObjectMessage m = session.createObjectMessage();
>>         m.setObject(a);
>>         //10 min TTL
>>         ((ActiveMQMessageProducer)producer).send(m,
>>     
> DeliveryMode.PERSISTENT,
>   
>>  Message.DEFAULT_PRIORITY, 600000L);
>>     }
>>
>>
>>  ...and on the Consumer...
>>
>>     Session session;
>>
>>     public void run() {
>>         String url =
>>
>>     
> "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>   
>>  ectAttempts=0";
>>         ActiveMQConnectionFactory connectionFactory = new
>>  ActiveMQConnectionFactory(url);
>>       Connection connection = connectionFactory.createConnection();
>>       connection.start();
>>       connection.setExceptionListener(this);
>>       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>>    }
>>
>>    public void subscribe(String destName, MessageListener l) throws
>>  JMSException {
>>       destName = destName + "?consumer.retroactive=true";
>>       MessageConsumer mc =
>>  session.createConsumer(session.createQueue(destName));
>>       mc.setMessageListener(l);
>>     }
>>
>>
>>     
>  
>
>
>   


RE: Retroactive consumer...yes, no, maybe so?

Posted by Andrew M <an...@oc384.net>.
Aaron,
My original producer and consumer code are at the bottom of the msg.  Note
the subscribe method appends retroactive=true.  
Thanks for any suggestions you may have..
Andrew


-----Original Message-----
From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
Mulder
Sent: Tuesday, March 18, 2008 10:07 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumer...yes, no, maybe so?

Do you want to post your example that's *not* working?  I last used
retroactive consumers probably 18 months ago, and they worked fine at
that time.  I was doing a network of brokers with fail-over, and if I
took one broker down and caused a consumer to fail over, it missed
messages during the fail-over operation.  With retroactive consumer
enabled, it didn't miss any messages (but got some duplicates) once it
failed over.  I don't have that code/configuration at hand, though --
just this from my notes:

topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");

And I used this to set the retroactive consumers to receive the last
30s worth of messages, instead of the default (which I think at the
time was last 100):

<broker>
  <destinationPolicy>
    <policyMap>
      <defaultEntry>
        <policyEntry topic="*">
          <subscriptionRecoveryPolicy>
            <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
          </subscriptionRecoveryPolicy>
        </policyEntry>
      </defaultEntry>
    </policyMap>
  </destinationPolicy>
</broker>

http://www.activemq.org/site/retroactive-consumer.html
http://www.activemq.org/site/subscription-recovery-policy.html

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <an...@oc384.net> wrote:
 
>
>  -----Original Message-----
>  From: Andrew [mailto:andrew@oc384.net]
>  Sent: Wednesday, March 05, 2008 2:40 PM
>  To: users@activemq.apache.org
>  Subject: Retroactive consumer not working...
>
>  My broker is not feeding my consumer the messages from the retroactive
queue
>  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
so
>  I would think when my consumer reconnects it should receive the last 10
mins
>  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>  ideas?
>
>
>  On the producer.......
>
>     private Session getActiveMqSession() throws JMSException {
>         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  ACTIVE_MQ_PORT +
>  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>         connection = connectionFactory.createConnection();
>         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>         connection.start();
>         return connection.createSession(false,
Session.SESSION_TRANSACTED);
>     }
>
>     Session session = getActiveMqSession();
>
>     void send(Object a) throws blah blah blah {
>         Destination destination = session.createQueue(consumerName);
>         producer = session.createProducer(destination);
>         ObjectMessage m = session.createObjectMessage();
>         m.setObject(a);
>         //10 min TTL
>         ((ActiveMQMessageProducer)producer).send(m,
DeliveryMode.PERSISTENT,
>  Message.DEFAULT_PRIORITY, 600000L);
>     }
>
>
>  ...and on the Consumer...
>
>     Session session;
>
>     public void run() {
>         String url =
>
"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  ectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>       Connection connection = connectionFactory.createConnection();
>       connection.start();
>       connection.setExceptionListener(this);
>       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>    }
>
>    public void subscribe(String destName, MessageListener l) throws
>  JMSException {
>       destName = destName + "?consumer.retroactive=true";
>       MessageConsumer mc =
>  session.createConsumer(session.createQueue(destName));
>       mc.setMessageListener(l);
>     }
>
>
 


Re: Retroactive consumer...yes, no, maybe so?

Posted by Aaron Mulder <am...@alumni.princeton.edu>.
Do you want to post your example that's *not* working?  I last used
retroactive consumers probably 18 months ago, and they worked fine at
that time.  I was doing a network of brokers with fail-over, and if I
took one broker down and caused a consumer to fail over, it missed
messages during the fail-over operation.  With retroactive consumer
enabled, it didn't miss any messages (but got some duplicates) once it
failed over.  I don't have that code/configuration at hand, though --
just this from my notes:

topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");

And I used this to set the retroactive consumers to receive the last
30s worth of messages, instead of the default (which I think at the
time was last 100):

<broker>
  <destinationPolicy>
    <policyMap>
      <defaultEntry>
        <policyEntry topic="*">
          <subscriptionRecoveryPolicy>
            <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
          </subscriptionRecoveryPolicy>
        </policyEntry>
      </defaultEntry>
    </policyMap>
  </destinationPolicy>
</broker>

http://www.activemq.org/site/retroactive-consumer.html
http://www.activemq.org/site/subscription-recovery-policy.html

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <an...@oc384.net> wrote:
> Does anyone have an example of how to make a retroactive consumer work?
>  Thanks,
>  Andrew
>
>
>  -----Original Message-----
>  From: Andrew M [mailto:andrew@oc384.net]
>  Sent: Friday, March 07, 2008 5:05 PM
>  To: users@activemq.apache.org
>  Subject: RE: Retroactive consumer not working...
>
>  Does anyone have an example of working retroactive consumer code?
>
>  Thanks,
>  Andrew
>
>
>
>
>  -----Original Message-----
>  From: Andrew [mailto:andrew@oc384.net]
>  Sent: Wednesday, March 05, 2008 2:40 PM
>  To: users@activemq.apache.org
>  Subject: Retroactive consumer not working...
>
>  My broker is not feeding my consumer the messages from the retroactive queue
>  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL so
>  I would think when my consumer reconnects it should receive the last 10 mins
>  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>  ideas?
>
>
>  On the producer.......
>
>     private Session getActiveMqSession() throws JMSException {
>         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  ACTIVE_MQ_PORT +
>  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>         connection = connectionFactory.createConnection();
>         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>         connection.start();
>         return connection.createSession(false, Session.SESSION_TRANSACTED);
>     }
>
>     Session session = getActiveMqSession();
>
>     void send(Object a) throws blah blah blah {
>         Destination destination = session.createQueue(consumerName);
>         producer = session.createProducer(destination);
>         ObjectMessage m = session.createObjectMessage();
>         m.setObject(a);
>         //10 min TTL
>         ((ActiveMQMessageProducer)producer).send(m, DeliveryMode.PERSISTENT,
>  Message.DEFAULT_PRIORITY, 600000L);
>     }
>
>
>  ...and on the Consumer...
>
>     Session session;
>
>     public void run() {
>         String url =
>  "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  ectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>       Connection connection = connectionFactory.createConnection();
>       connection.start();
>       connection.setExceptionListener(this);
>       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>    }
>
>    public void subscribe(String destName, MessageListener l) throws
>  JMSException {
>       destName = destName + "?consumer.retroactive=true";
>       MessageConsumer mc =
>  session.createConsumer(session.createQueue(destName));
>       mc.setMessageListener(l);
>     }
>
>
>
>
>
>
>
>  -----Original Message-----
>  From: Andrew [mailto:andrew@oc384.net]
>  Sent: Wednesday, March 05, 2008 9:49 AM
>  To: users@activemq.apache.org
>  Subject: RE: purging an ActiveMQ queue
>
>  Thanks I'll create it as retroactive.  Problem is, how can I tell the broker
>  to purge the msgs at midnight so I get only msgs from the current day?
>  Would I just schedule a messageConsumer.close() ?
>
>  Thanks,
>  Andrew
>
>
>  -----Original Message-----
>  From: James Strachan [mailto:james.strachan@gmail.com]
>  Sent: Wednesday, March 05, 2008 12:17 AM
>  To: users@activemq.apache.org
>  Subject: Re: purging an ActiveMQ queue
>
>  On 04/03/2008, Andrew <an...@oc384.net> wrote:
>  > I would like to set up a topic or queue so that when a client connects to
>  >  the broker and subscribes to that topic he receives a refresh of all the
>  >  messages sent so far on that topic that day.  At midnight the broker
>  should
>  >  purge the day's messages.  What's the best way to do that?
>
>  For queues, messages stay on a queue until they expire. For topics,
>  normally you only get messages sent after you start subscribing
>  (unless you use persistent topic subscriptions) though ActiveMQ
>  supports Subscription Recovery for this kinda stuff...
>
>  http://activemq.apache.org/subscription-recovery-policy.html
>
>  --
>  James
>  -------
>  http://macstrac.blogspot.com/
>
>  Open Source Integration
>  http://open.iona.com
>
>
>
>
>
>