You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by serious <se...@orange.fr> on 2011/05/30 20:22:12 UTC

Keeping the messages in a queue or topic during a day

Hello,

here are two scenarios that illustrate the point :
- a logging system : some applications send exceptions messages to a
topic/queue;
a logging client will listen to the topic/queue. But each time the client
picks a message, it is deleted from the queue and if the client stops and
reconnects later it won't be able to see the previous error messages.
- a synchronization system : each time a data-source is updated it will
notifies some caches which listen to a topic. But if a cache stops and
reconnects later it will not be aware of the changes that could have
occurred meanwhile.

So in this two cases it would be interesting if the messages could remain in
the queue, during a predefined duration, like a day.

Are there some ways to accomplish this with ActiveMQ ?

Thanks in advance for your help.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3561343.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Keeping the messages in a queue or topic during a day

Posted by serious <se...@orange.fr>.
@Christian : thanks for your answer; this is indeed the current
implementation, but the subscribers need to be known in advance, so a new
one can't be used at the last minute to carry out the tasks.

I guess this scenario is not an expected use-case for a queue because this
is more a data-storing issue than a data-transfer one, so it should
logically be handled by a database; but to depend on only one system
(ActiveMQ) instead of two (ActiveMQ + MySQL) seems more simple to implement
and administer and more reliable.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3565302.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Keeping the messages in a queue or topic during a day

Posted by Christian Schneider <ch...@die-schneider.net>.
If you setup a durable subscriber for the logger then all the messages 
will pile up there. If you start the logger once a day then it will 
receive all the events that piled up.
So this should implement what you need.

Christian

Am 31.05.2011 11:33, schrieb serious:
> @Gary Tully and Marcelo Jabali : thanks a lot for your answers.
>
> Indeed durable topics handle the reliability issue caused by subscribers
> shutting down.
>
> I have another requirement : I'd like that any new subscriber receives all
> the messages of a queue/topic posted during the whole day.
> Here is a scenario for it : one of the logger/cache is only launched at the
> end of the day and should be able to get all the errors/updates that have
> happened during the day in order to process them in bulk.
>
> How could this scenario be handled ?
>
> Thanks.
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3562643.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


-- 
--
Christian Schneider
http://www.liquid-reality.de

Apache CXF and Camel Architect
Talend Application Integration Division http://www.talend.com


Re: Keeping the messages in a queue or topic during a day

Posted by serious <se...@orange.fr>.
Thanks a lot Mr Tully, this is the right solution.

So, to sum up :
- you first have to configure the broker by tweaking its "conf/activemq.xml"
configuration file :
[code]
<?xml version="1.0" encoding="UTF-8"?>
<beans...>
	<broker ...>
		<destinationPolicy>
			<policyMap>
				<policyEntries>
					...
					<policyEntry topic="tests.retro.>" producerFlowControl="true"
memoryLimit="1mb">
							<subscriptionRecoveryPolicy>
								<timedSubscriptionRecoveryPolicy recoverDuration="3600000" />
							</subscriptionRecoveryPolicy>
					</policyEntry>
				</policyEntries>
			</policyMap>
		</destinationPolicy>
	</broker>
	...
</beans>
[/code]
The "recoverDuration" is expressed in ms, so here the duration is 1 hour.

- then you should be able to use retroactive consumers (some C#/NMS sample)
:
[code]
string retroactiveTopicName = "tests.retro.test_" + Guid.NewGuid() +
".topic";

ConnectionFactory connectionFactory = new
ConnectionFactory("tcp://localhost:61616");

var producerConnection = connectionFactory.CreateConnection();
producerConnection.Start();

var producerSession = (Session)producerConnection.CreateSession();

var producer = producerSession.CreateProducer();

var message = producer.CreateTextMessage("First message!");

producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

message = producer.CreateTextMessage("Second message!");

producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

var consumerConnection = connectionFactory.CreateConnection();
consumerConnection.Start();

var consumersSession = (Session)consumerConnection.CreateSession();

var retroactiveConsumer =
(MessageConsumer)consumersSession.CreateConsumer(consumersSession.GetTopic(retroactiveTopicName
+ "?consumer.retroactive=true"));
retroactiveConsumer.Listener += incomingMessage =>
{
	Console.WriteLine("Retroactive consumer in : {0}!", (incomingMessage as
ITextMessage).Text);
};

var nonRetroactiveConsumer =
(MessageConsumer)consumersSession.CreateConsumer(consumersSession.GetTopic(retroactiveTopicName));
nonRetroactiveConsumer.Listener += incomingMessage =>
{
	Console.WriteLine("Non retroactive consumer in : {0}!", (incomingMessage as
ITextMessage).Text);
};

message = producer.CreateTextMessage("Third message!");

producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

System.Console.Write("Press enter to exit...");
System.Console.ReadLine();
[/code]

Notice the "?consumer.retroactive=true" appended to the name of the topic to
make the current consumer retroactive.

Only the retroactive consumer should receive the three messages, the other
one only the third one.


Hopefully this will help somebody else.

Thanks all for your precious help.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3593408.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Keeping the messages in a queue or topic during a day

Posted by Gary Tully <ga...@gmail.com>.
have a peek at the tests:

http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/

you need to configure a recovery policy on the destinations via the
broker xml configuration. the default policy is to not hold on to any
messages.

On 6 June 2011 11:53, serious <se...@orange.fr> wrote:
> @Gary : thanks for the pointer, this looks like the ideal solution.
>
> But I can't make it work, here is some code that does not work as expected :
> (Note : this is some C# using the NMS API but this is very close to
> Java+JMS)
>
> [code]// Generate a new unique topic name
> string retroactiveTopicName = "testRetro_" + Guid.NewGuid() + ".topic";
>
> ConnectionFactory connectionFactory = new
> ConnectionFactory("tcp://localhost:61616");
>
> var producerConnection = connectionFactory.CreateConnection();
> producerConnection.Start();
>
> var producerSession = (Session)producerConnection.CreateSession();
>
> var producer = producerSession.CreateProducer();
>
> var message = producer.CreateTextMessage("First message!");
>
> // Send a first message
> producer.Send(producerSession.GetTopic(retroactiveTopicName), message);
>
> message = producer.CreateTextMessage("Second message!") as
> ActiveMQTextMessage;
>
> // Send a second message
> producer.Send(producerSession.GetTopic(retroactiveTopicName), message);
>
> var consumerConnection = connectionFactory.CreateConnection();
> consumerConnection.Start();
>
> var consumerSession = (Session)consumerConnection.CreateSession();
>
> var consumer =
> (MessageConsumer)consumerSession.CreateConsumer(consumerSession.GetTopic(retroactiveTopicName
> + "?consumer.retroactive=true"));
> // Wait for incoming messages
> consumer.Listener += incomingMessage =>
> {
>        Console.WriteLine("Consumer in : {0}!", (incomingMessage as
> ITextMessage).Text);
> };
>
> message = producer.CreateTextMessage("Third message!");
>
> // Send a third message
> producer.Send(producerSession.GetTopic(retroactiveTopicName), message);
>
> System.Console.Write("Press enter to exit...");
> System.Console.ReadLine();[/code]
>
> The producer sends three messages : two before the consumer is connected,
> one after.
> But the consumer only receives the third one, so the "retroactive" flag
> seems to be completely ignored.
>
> What am I doing wrong ?
>
> Thanks.
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3576662.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com

Re: Keeping the messages in a queue or topic during a day

Posted by serious <se...@orange.fr>.
@Gary : thanks for the pointer, this looks like the ideal solution.

But I can't make it work, here is some code that does not work as expected :
(Note : this is some C# using the NMS API but this is very close to
Java+JMS)

[code]// Generate a new unique topic name
string retroactiveTopicName = "testRetro_" + Guid.NewGuid() + ".topic";

ConnectionFactory connectionFactory = new
ConnectionFactory("tcp://localhost:61616");

var producerConnection = connectionFactory.CreateConnection();
producerConnection.Start();

var producerSession = (Session)producerConnection.CreateSession();

var producer = producerSession.CreateProducer();

var message = producer.CreateTextMessage("First message!");

// Send a first message
producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

message = producer.CreateTextMessage("Second message!") as
ActiveMQTextMessage;

// Send a second message
producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

var consumerConnection = connectionFactory.CreateConnection();
consumerConnection.Start();

var consumerSession = (Session)consumerConnection.CreateSession();

var consumer =
(MessageConsumer)consumerSession.CreateConsumer(consumerSession.GetTopic(retroactiveTopicName
+ "?consumer.retroactive=true"));
// Wait for incoming messages
consumer.Listener += incomingMessage =>
{
	Console.WriteLine("Consumer in : {0}!", (incomingMessage as
ITextMessage).Text);
};

message = producer.CreateTextMessage("Third message!");

// Send a third message
producer.Send(producerSession.GetTopic(retroactiveTopicName), message);

System.Console.Write("Press enter to exit...");
System.Console.ReadLine();[/code]

The producer sends three messages : two before the consumer is connected,
one after.
But the consumer only receives the third one, so the "retroactive" flag
seems to be completely ignored.

What am I doing wrong ?

Thanks.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3576662.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Keeping the messages in a queue or topic during a day

Posted by Gary Tully <ga...@gmail.com>.
Silence typically means folks are busy!
Have a look at http://activemq.apache.org/retroactive-consumer.html
and http://activemq.apache.org/subscription-recovery-policy.html


On 1 June 2011 09:15, serious <se...@orange.fr> wrote:
> I guess this silence means : "No, there is no way to do that with ActiveMQ"
> :)
>
> So, do you know any other tool that could fit the above needs, without
> having to implement a new one from scratch ?
>
> Thanks in advance.
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3565219.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://fusesource.com
http://blog.garytully.com

Re: Keeping the messages in a queue or topic during a day

Posted by serious <se...@orange.fr>.
I guess this silence means : "No, there is no way to do that with ActiveMQ"
:)

So, do you know any other tool that could fit the above needs, without
having to implement a new one from scratch ?

Thanks in advance.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3565219.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Keeping the messages in a queue or topic during a day

Posted by serious <se...@orange.fr>.
@Gary Tully and Marcelo Jabali : thanks a lot for your answers.

Indeed durable topics handle the reliability issue caused by subscribers
shutting down.

I have another requirement : I'd like that any new subscriber receives all
the messages of a queue/topic posted during the whole day.
Here is a scenario for it : one of the logger/cache is only launched at the
end of the day and should be able to get all the errors/updates that have
happened during the day in order to process them in bulk.

How could this scenario be handled ?

Thanks.

--
View this message in context: http://activemq.2283324.n4.nabble.com/Keeping-the-messages-in-a-queue-or-topic-during-a-day-tp3561343p3562643.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.