You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Michal Singer <mi...@leadspace.com> on 2014/01/02 08:34:39 UTC

RE: Guaranteeing message processing on strom fails

Hi, thanks for this reply.

I read now this chapter. I am trying the jms example which does support the
ack and fail methods.

I am still having issues with lost data. I am checking this to see if I
find any problem.



Please let me know if anyone knows of issues with this example and
guaranteeing messages.

Thanks, Michal



*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Tuesday, December 31, 2013 10:35 AM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Okay so I think what you are doing is using the storm-starter to check
whether message processing is guaranteed.

But I think you are not replaying the messages in the spout (I could be
wrong). Are you changing the fail() method inside the spout ?

fail() method must be used to replay the messages by the programmer it is
not automatically done by storm.

Have a look at this for better understanding.

https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java



And read the 4th chapter from this book

http://shop.oreilly.com/product/0636920024835.do



And if you are doing it then the uniqueness of the message could be a valid
reason.



On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>
wrote:

thanks, I am checking this out. This might be a problem.

But there is still this issue:



Ui – some streams are missing in UI as a result of the worker being killed
or rebalance



*From:* Nathan Leung [mailto:ncleung@gmail.com]
*Sent:* Monday, December 30, 2013 6:35 PM
*To:* user


*Subject:* Re: Guaranteeing message processing on strom fails



You are using the sentence as the message ID?  The word count example
repeats sentences, and your message IDs need to be unique.



On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com> wrote:

In my test I am using the word counter that was in the code samples of
storm starter.

The first spout is a the word reader that reads lines from files and sends
them to the word normalizer which send the words in the lines to the word
counter.

The spout emits: List<Object> tuple, Object messageId

The message id is the line.

The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple
and sends ack at the end of the execute method

The word counter sends ack on the input it receives.



If I kill a worker, this spout does not use any queue, so it won’t be
resent?

Isn’t this the purpose of the anchors? Do know what to send in case of
timeout?

Thanks, Michal









*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Monday, December 30, 2013 8:55 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).



The spout must pass along a unique message identifier to enable this
behavior.



If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that
message.


Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com



On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>
wrote:

But what I see is that some of the tuples are not replayed. I print out all
the tuples and some don’t arrive when I kill a worker process.

Thanks, Michal



*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Sunday, December 29, 2013 7:26 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



You are not guaranteed that tuples make it, only that if they go missing or
processing fails it will replayed from the spout "at least once execution"

On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com> wrote:

Hi, I am trying to test the guaranteeing of messages on storm:



I have two nodes:

1.       Node-A contains: supervisor, ui, zookeeper, nimbus

2.       Node-B contains: supervisor

I have 1 spout and 2 bolts (I am actually running the word counter test)



I send about 17 messages.

The first bolt is on Node-B

The second bolt is divided to 4 executors which are divided between the
nodes.



I kill the worker on node-B and I expect the worker on the other node to
get the data which was supposed to be sent to Node-B.

But the worker is raised on node-B and the data is sent there accept for
one touple which is missing.



1.       According to the ui – it is very difficult to see what is going on
cause I guess there are messages resent and it is hard to know exactly what
happened.

2.       According to my output – one message is missing which was supposed
to go to node-B which I killed it’s worker.



I defined anchors and I sent acks.



So what am I missing? Why is there data loss?



Thanks, Michal











-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*

RE: Guaranteeing message processing on strom fails

Posted by Michal Singer <mi...@leadspace.com>.
After a few tests I see that on some scenarios the spout is resent. So why
not always?

thanks



*From:* Michal Singer [mailto:michal@leadspace.com]
*Sent:* Sunday, January 05, 2014 7:00 AM
*To:* 'user@storm.incubator.apache.org'
*Subject:* RE: Guaranteeing message processing on strom fails



Hi, I didn’t work with kafka. I worked with ActiveMQ.

I worked with Activemq using spring configuration, and I didn’t need to
recover the session.

If the listener to JMS queue did not ack a message and I restarted the
listener, it got the message again on onMessage.

I am not sure if it worked due to some spring behind the scene which is
different from the Jms Storm example.

In any case, in storm if I restart a worker and the message was not acked
before, it is not replayed and I don’t know why.

Thanks, Michal







*From:* Adrian Mocanu
[mailto:amocanu@verticalscope.com<am...@verticalscope.com>]

*Sent:* Thursday, January 02, 2014 5:08 PM
*To:* user@storm.incubator.apache.org
*Subject:* RE: Guaranteeing message processing on strom fails



Hi Michal,

How did you solve this ack problem with the kafka spout?



I’m not sure what you mean by “working with AMP in a different
consolation“. What changes did you make to the kafka spout code (url?) ?



Thanks

Adrian



*From:* Michal Singer [mailto:michal@leadspace.com <mi...@leadspace.com>]
*Sent:* January-02-14 6:55 AM
*To:* user@storm.incubator.apache.org
*Subject:* RE: Guaranteeing message processing on strom fails



I added the code: Code for JMS Storm Spout example

Please note that other than log messages I added it is the same as the
example I found in the storm examples.

What I don’t understand is why the AMQ does not resend messages which were
not acked.

I killed a worker and as a result a message got failed but the AMQ still
did not resend.

I also don’t understand the reason for the RecoveryTask  in the example.

When I worked with AMQ in a different consolation – it did resend messages.



Thanks, Michal





*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Thursday, January 02, 2014 1:28 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Can you give a link to your code(only the spout implementation). You can
use http://pastebin.com/.



On Thu, Jan 2, 2014 at 1:04 PM, Michal Singer <mi...@leadspace.com> wrote:

Hi, thanks for this reply.

I read now this chapter. I am trying the jms example which does support the
ack and fail methods.

I am still having issues with lost data. I am checking this to see if I
find any problem.



Please let me know if anyone knows of issues with this example and
guaranteeing messages.

Thanks, Michal



*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Tuesday, December 31, 2013 10:35 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Okay so I think what you are doing is using the storm-starter to check
whether message processing is guaranteed.

But I think you are not replaying the messages in the spout (I could be
wrong). Are you changing the fail() method inside the spout ?

fail() method must be used to replay the messages by the programmer it is
not automatically done by storm.

Have a look at this for better understanding.

https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java



And read the 4th chapter from this book

http://shop.oreilly.com/product/0636920024835.do



And if you are doing it then the uniqueness of the message could be a valid
reason.



On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>
wrote:

thanks, I am checking this out. This might be a problem.

But there is still this issue:



Ui – some streams are missing in UI as a result of the worker being killed
or rebalance



*From:* Nathan Leung [mailto:ncleung@gmail.com]
*Sent:* Monday, December 30, 2013 6:35 PM
*To:* user


*Subject:* Re: Guaranteeing message processing on strom fails



You are using the sentence as the message ID?  The word count example
repeats sentences, and your message IDs need to be unique.



On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com> wrote:

In my test I am using the word counter that was in the code samples of
storm starter.

The first spout is a the word reader that reads lines from files and sends
them to the word normalizer which send the words in the lines to the word
counter.

The spout emits: List<Object> tuple, Object messageId

The message id is the line.

The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple
and sends ack at the end of the execute method

The word counter sends ack on the input it receives.



If I kill a worker, this spout does not use any queue, so it won’t be
resent?

Isn’t this the purpose of the anchors? Do know what to send in case of
timeout?

Thanks, Michal









*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Monday, December 30, 2013 8:55 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).



The spout must pass along a unique message identifier to enable this
behavior.



If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that
message.


Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com



On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>
wrote:

But what I see is that some of the tuples are not replayed. I print out all
the tuples and some don’t arrive when I kill a worker process.

Thanks, Michal



*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Sunday, December 29, 2013 7:26 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



You are not guaranteed that tuples make it, only that if they go missing or
processing fails it will replayed from the spout "at least once execution"

On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com> wrote:

Hi, I am trying to test the guaranteeing of messages on storm:



I have two nodes:

1.       Node-A contains: supervisor, ui, zookeeper, nimbus

2.       Node-B contains: supervisor

I have 1 spout and 2 bolts (I am actually running the word counter test)



I send about 17 messages.

The first bolt is on Node-B

The second bolt is divided to 4 executors which are divided between the
nodes.



I kill the worker on node-B and I expect the worker on the other node to
get the data which was supposed to be sent to Node-B.

But the worker is raised on node-B and the data is sent there accept for
one touple which is missing.



1.       According to the ui – it is very difficult to see what is going on
cause I guess there are messages resent and it is hard to know exactly what
happened.

2.       According to my output – one message is missing which was supposed
to go to node-B which I killed it’s worker.



I defined anchors and I sent acks.



So what am I missing? Why is there data loss?



Thanks, Michal











-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*





-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*

RE: Guaranteeing message processing on strom fails

Posted by Michal Singer <mi...@leadspace.com>.
Hi, I didn’t work with kafka. I worked with ActiveMQ.

I worked with Activemq using spring configuration, and I didn’t need to
recover the session.

If the listener to JMS queue did not ack a message and I restarted the
listener, it got the message again on onMessage.

I am not sure if it worked due to some spring behind the scene which is
different from the Jms Storm example.

In any case, in storm if I restart a worker and the message was not acked
before, it is not replayed and I don’t know why.

Thanks, Michal







*From:* Adrian Mocanu [mailto:amocanu@verticalscope.com]
*Sent:* Thursday, January 02, 2014 5:08 PM
*To:* user@storm.incubator.apache.org
*Subject:* RE: Guaranteeing message processing on strom fails



Hi Michal,

How did you solve this ack problem with the kafka spout?



I’m not sure what you mean by “working with AMP in a different
consolation“. What changes did you make to the kafka spout code (url?) ?



Thanks

Adrian



*From:* Michal Singer [mailto:michal@leadspace.com <mi...@leadspace.com>]
*Sent:* January-02-14 6:55 AM
*To:* user@storm.incubator.apache.org
*Subject:* RE: Guaranteeing message processing on strom fails



I added the code: Code for JMS Storm Spout example

Please note that other than log messages I added it is the same as the
example I found in the storm examples.

What I don’t understand is why the AMQ does not resend messages which were
not acked.

I killed a worker and as a result a message got failed but the AMQ still
did not resend.

I also don’t understand the reason for the RecoveryTask  in the example.

When I worked with AMQ in a different consolation – it did resend messages.



Thanks, Michal





*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Thursday, January 02, 2014 1:28 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Can you give a link to your code(only the spout implementation). You can
use http://pastebin.com/.



On Thu, Jan 2, 2014 at 1:04 PM, Michal Singer <mi...@leadspace.com> wrote:

Hi, thanks for this reply.

I read now this chapter. I am trying the jms example which does support the
ack and fail methods.

I am still having issues with lost data. I am checking this to see if I
find any problem.



Please let me know if anyone knows of issues with this example and
guaranteeing messages.

Thanks, Michal



*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Tuesday, December 31, 2013 10:35 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Okay so I think what you are doing is using the storm-starter to check
whether message processing is guaranteed.

But I think you are not replaying the messages in the spout (I could be
wrong). Are you changing the fail() method inside the spout ?

fail() method must be used to replay the messages by the programmer it is
not automatically done by storm.

Have a look at this for better understanding.

https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java



And read the 4th chapter from this book

http://shop.oreilly.com/product/0636920024835.do



And if you are doing it then the uniqueness of the message could be a valid
reason.



On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>
wrote:

thanks, I am checking this out. This might be a problem.

But there is still this issue:



Ui – some streams are missing in UI as a result of the worker being killed
or rebalance



*From:* Nathan Leung [mailto:ncleung@gmail.com]
*Sent:* Monday, December 30, 2013 6:35 PM
*To:* user


*Subject:* Re: Guaranteeing message processing on strom fails



You are using the sentence as the message ID?  The word count example
repeats sentences, and your message IDs need to be unique.



On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com> wrote:

In my test I am using the word counter that was in the code samples of
storm starter.

The first spout is a the word reader that reads lines from files and sends
them to the word normalizer which send the words in the lines to the word
counter.

The spout emits: List<Object> tuple, Object messageId

The message id is the line.

The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple
and sends ack at the end of the execute method

The word counter sends ack on the input it receives.



If I kill a worker, this spout does not use any queue, so it won’t be
resent?

Isn’t this the purpose of the anchors? Do know what to send in case of
timeout?

Thanks, Michal









*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Monday, December 30, 2013 8:55 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).



The spout must pass along a unique message identifier to enable this
behavior.



If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that
message.


Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com



On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>
wrote:

But what I see is that some of the tuples are not replayed. I print out all
the tuples and some don’t arrive when I kill a worker process.

Thanks, Michal



*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Sunday, December 29, 2013 7:26 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



You are not guaranteed that tuples make it, only that if they go missing or
processing fails it will replayed from the spout "at least once execution"

On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com> wrote:

Hi, I am trying to test the guaranteeing of messages on storm:



I have two nodes:

1.       Node-A contains: supervisor, ui, zookeeper, nimbus

2.       Node-B contains: supervisor

I have 1 spout and 2 bolts (I am actually running the word counter test)



I send about 17 messages.

The first bolt is on Node-B

The second bolt is divided to 4 executors which are divided between the
nodes.



I kill the worker on node-B and I expect the worker on the other node to
get the data which was supposed to be sent to Node-B.

But the worker is raised on node-B and the data is sent there accept for
one touple which is missing.



1.       According to the ui – it is very difficult to see what is going on
cause I guess there are messages resent and it is hard to know exactly what
happened.

2.       According to my output – one message is missing which was supposed
to go to node-B which I killed it’s worker.



I defined anchors and I sent acks.



So what am I missing? Why is there data loss?



Thanks, Michal











-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*





-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*

RE: Guaranteeing message processing on strom fails

Posted by Adrian Mocanu <am...@verticalscope.com>.
Hi Michal,
How did you solve this ack problem with the kafka spout?

I'm not sure what you mean by "working with AMP in a different consolation". What changes did you make to the kafka spout code (url?) ?

Thanks
Adrian

From: Michal Singer [mailto:michal@leadspace.com]
Sent: January-02-14 6:55 AM
To: user@storm.incubator.apache.org
Subject: RE: Guaranteeing message processing on strom fails

I added the code: Code for JMS Storm Spout example
Please note that other than log messages I added it is the same as the example I found in the storm examples.
What I don't understand is why the AMQ does not resend messages which were not acked.
I killed a worker and as a result a message got failed but the AMQ still did not resend.
I also don't understand the reason for the RecoveryTask  in the example.
When I worked with AMQ in a different consolation - it did resend messages.

Thanks, Michal


From: Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com<ma...@gmail.com>]
Sent: Thursday, January 02, 2014 1:28 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Guaranteeing message processing on strom fails

Can you give a link to your code(only the spout implementation). You can use http://pastebin.com/.

On Thu, Jan 2, 2014 at 1:04 PM, Michal Singer <mi...@leadspace.com>> wrote:
Hi, thanks for this reply.
I read now this chapter. I am trying the jms example which does support the ack and fail methods.
I am still having issues with lost data. I am checking this to see if I find any problem.

Please let me know if anyone knows of issues with this example and guaranteeing messages.
Thanks, Michal

From: Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com<ma...@gmail.com>]
Sent: Tuesday, December 31, 2013 10:35 AM

To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Guaranteeing message processing on strom fails

Okay so I think what you are doing is using the storm-starter to check whether message processing is guaranteed.
But I think you are not replaying the messages in the spout (I could be wrong). Are you changing the fail() method inside the spout ?
fail() method must be used to replay the messages by the programmer it is not automatically done by storm.
Have a look at this for better understanding.
https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java

And read the 4th chapter from this book
http://shop.oreilly.com/product/0636920024835.do

And if you are doing it then the uniqueness of the message could be a valid reason.

On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>> wrote:
thanks, I am checking this out. This might be a problem.
But there is still this issue:

Ui - some streams are missing in UI as a result of the worker being killed or rebalance

From: Nathan Leung [mailto:ncleung@gmail.com<ma...@gmail.com>]
Sent: Monday, December 30, 2013 6:35 PM
To: user

Subject: Re: Guaranteeing message processing on strom fails

You are using the sentence as the message ID?  The word count example repeats sentences, and your message IDs need to be unique.

On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com>> wrote:
In my test I am using the word counter that was in the code samples of storm starter.
The first spout is a the word reader that reads lines from files and sends them to the word normalizer which send the words in the lines to the word counter.
The spout emits: List<Object> tuple, Object messageId
The message id is the line.
The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple and sends ack at the end of the execute method
The word counter sends ack on the input it receives.

If I kill a worker, this spout does not use any queue, so it won't be resent?
Isn't this the purpose of the anchors? Do know what to send in case of timeout?
Thanks, Michal




From: Michael Rose [mailto:michael@fullcontact.com<ma...@fullcontact.com>]
Sent: Monday, December 30, 2013 8:55 AM

To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Guaranteeing message processing on strom fails

What spout are you using? Guarantees must be enforced by spout. For instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is called for that message tag (or rejects if fail() is called).

The spout must pass along a unique message identifier to enable this behavior.

If a tuple goes missing somewhere along the line, fail() will be called after a timeout. If you kill the acker that tuple was tracked with, it's then up to the message queue or other impl to be able to replay that message.


Michael Rose (@Xorlev<https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact<http://www.fullcontact.com/>
michael@fullcontact.com<ma...@fullcontact.com>

On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>> wrote:
But what I see is that some of the tuples are not replayed. I print out all the tuples and some don't arrive when I kill a worker process.
Thanks, Michal

From: Michael Rose [mailto:michael@fullcontact.com<ma...@fullcontact.com>]
Sent: Sunday, December 29, 2013 7:26 PM
To: user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>
Subject: Re: Guaranteeing message processing on strom fails


You are not guaranteed that tuples make it, only that if they go missing or processing fails it will replayed from the spout "at least once execution"
On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com>> wrote:
Hi, I am trying to test the guaranteeing of messages on storm:

I have two nodes:

1.       Node-A contains: supervisor, ui, zookeeper, nimbus

2.       Node-B contains: supervisor
I have 1 spout and 2 bolts (I am actually running the word counter test)

I send about 17 messages.
The first bolt is on Node-B
The second bolt is divided to 4 executors which are divided between the nodes.

I kill the worker on node-B and I expect the worker on the other node to get the data which was supposed to be sent to Node-B.
But the worker is raised on node-B and the data is sent there accept for one touple which is missing.


1.       According to the ui - it is very difficult to see what is going on cause I guess there are messages resent and it is hard to know exactly what happened.

2.       According to my output - one message is missing which was supposed to go to node-B which I killed it's worker.

I defined anchors and I sent acks.

So what am I missing? Why is there data loss?

Thanks, Michal






--
Abhishek Bhattacharjee
Pune Institute of Computer Technology



--
Abhishek Bhattacharjee
Pune Institute of Computer Technology

RE: Guaranteeing message processing on strom fails

Posted by Michal Singer <mi...@leadspace.com>.
I added the code: Code for JMS Storm Spout example

Please note that other than log messages I added it is the same as the
example I found in the storm examples.

What I don’t understand is why the AMQ does not resend messages which were
not acked.

I killed a worker and as a result a message got failed but the AMQ still
did not resend.

I also don’t understand the reason for the RecoveryTask  in the example.

When I worked with AMQ in a different consolation – it did resend messages.



Thanks, Michal





*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Thursday, January 02, 2014 1:28 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Can you give a link to your code(only the spout implementation). You can
use http://pastebin.com/.



On Thu, Jan 2, 2014 at 1:04 PM, Michal Singer <mi...@leadspace.com> wrote:

Hi, thanks for this reply.

I read now this chapter. I am trying the jms example which does support the
ack and fail methods.

I am still having issues with lost data. I am checking this to see if I
find any problem.



Please let me know if anyone knows of issues with this example and
guaranteeing messages.

Thanks, Michal



*From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
*Sent:* Tuesday, December 31, 2013 10:35 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



Okay so I think what you are doing is using the storm-starter to check
whether message processing is guaranteed.

But I think you are not replaying the messages in the spout (I could be
wrong). Are you changing the fail() method inside the spout ?

fail() method must be used to replay the messages by the programmer it is
not automatically done by storm.

Have a look at this for better understanding.

https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java



And read the 4th chapter from this book

http://shop.oreilly.com/product/0636920024835.do



And if you are doing it then the uniqueness of the message could be a valid
reason.



On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>
wrote:

thanks, I am checking this out. This might be a problem.

But there is still this issue:



Ui – some streams are missing in UI as a result of the worker being killed
or rebalance



*From:* Nathan Leung [mailto:ncleung@gmail.com]
*Sent:* Monday, December 30, 2013 6:35 PM
*To:* user


*Subject:* Re: Guaranteeing message processing on strom fails



You are using the sentence as the message ID?  The word count example
repeats sentences, and your message IDs need to be unique.



On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com> wrote:

In my test I am using the word counter that was in the code samples of
storm starter.

The first spout is a the word reader that reads lines from files and sends
them to the word normalizer which send the words in the lines to the word
counter.

The spout emits: List<Object> tuple, Object messageId

The message id is the line.

The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple
and sends ack at the end of the execute method

The word counter sends ack on the input it receives.



If I kill a worker, this spout does not use any queue, so it won’t be
resent?

Isn’t this the purpose of the anchors? Do know what to send in case of
timeout?

Thanks, Michal









*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Monday, December 30, 2013 8:55 AM


*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



What spout are you using? Guarantees must be enforced by spout. For
instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
called for that message tag (or rejects if fail() is called).



The spout must pass along a unique message identifier to enable this
behavior.



If a tuple goes missing somewhere along the line, fail() will be called
after a timeout. If you kill the acker that tuple was tracked with, it's
then up to the message queue or other impl to be able to replay that
message.


Michael Rose (@Xorlev <https://twitter.com/xorlev>)
Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
michael@fullcontact.com



On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>
wrote:

But what I see is that some of the tuples are not replayed. I print out all
the tuples and some don’t arrive when I kill a worker process.

Thanks, Michal



*From:* Michael Rose [mailto:michael@fullcontact.com]
*Sent:* Sunday, December 29, 2013 7:26 PM
*To:* user@storm.incubator.apache.org
*Subject:* Re: Guaranteeing message processing on strom fails



You are not guaranteed that tuples make it, only that if they go missing or
processing fails it will replayed from the spout "at least once execution"

On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com> wrote:

Hi, I am trying to test the guaranteeing of messages on storm:



I have two nodes:

1.       Node-A contains: supervisor, ui, zookeeper, nimbus

2.       Node-B contains: supervisor

I have 1 spout and 2 bolts (I am actually running the word counter test)



I send about 17 messages.

The first bolt is on Node-B

The second bolt is divided to 4 executors which are divided between the
nodes.



I kill the worker on node-B and I expect the worker on the other node to
get the data which was supposed to be sent to Node-B.

But the worker is raised on node-B and the data is sent there accept for
one touple which is missing.



1.       According to the ui – it is very difficult to see what is going on
cause I guess there are messages resent and it is hard to know exactly what
happened.

2.       According to my output – one message is missing which was supposed
to go to node-B which I killed it’s worker.



I defined anchors and I sent acks.



So what am I missing? Why is there data loss?



Thanks, Michal











-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*





-- 

*Abhishek Bhattacharjee*

*Pune Institute of Computer Technology*

Re: Guaranteeing message processing on strom fails

Posted by Abhishek Bhattacharjee <ab...@gmail.com>.
Can you give a link to your code(only the spout implementation). You can
use http://pastebin.com/.


On Thu, Jan 2, 2014 at 1:04 PM, Michal Singer <mi...@leadspace.com> wrote:

> Hi, thanks for this reply.
>
> I read now this chapter. I am trying the jms example which does support
> the ack and fail methods.
>
> I am still having issues with lost data. I am checking this to see if I
> find any problem.
>
>
>
> Please let me know if anyone knows of issues with this example and
> guaranteeing messages.
>
> Thanks, Michal
>
>
>
> *From:* Abhishek Bhattacharjee [mailto:abhishek.bhattacharjee11@gmail.com]
>
> *Sent:* Tuesday, December 31, 2013 10:35 AM
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Guaranteeing message processing on strom fails
>
>
>
> Okay so I think what you are doing is using the storm-starter to check
> whether message processing is guaranteed.
>
> But I think you are not replaying the messages in the spout (I could be
> wrong). Are you changing the fail() method inside the spout ?
>
> fail() method must be used to replay the messages by the programmer it is
> not automatically done by storm.
>
> Have a look at this for better understanding.
>
>
> https://github.com/storm-book/examples-ch04-spouts/blob/master/src/main/java/banktransactions/TransactionsSpouts.java
>
>
>
> And read the 4th chapter from this book
>
> http://shop.oreilly.com/product/0636920024835.do
>
>
>
> And if you are doing it then the uniqueness of the message could be a
> valid reason.
>
>
>
> On Tue, Dec 31, 2013 at 12:29 PM, Michal Singer <mi...@leadspace.com>
> wrote:
>
> thanks, I am checking this out. This might be a problem.
>
> But there is still this issue:
>
>
>
> Ui – some streams are missing in UI as a result of the worker being killed
> or rebalance
>
>
>
> *From:* Nathan Leung [mailto:ncleung@gmail.com]
> *Sent:* Monday, December 30, 2013 6:35 PM
> *To:* user
>
>
> *Subject:* Re: Guaranteeing message processing on strom fails
>
>
>
> You are using the sentence as the message ID?  The word count example
> repeats sentences, and your message IDs need to be unique.
>
>
>
> On Mon, Dec 30, 2013 at 2:05 AM, Michal Singer <mi...@leadspace.com>
> wrote:
>
> In my test I am using the word counter that was in the code samples of
> storm starter.
>
> The first spout is a the word reader that reads lines from files and sends
> them to the word normalizer which send the words in the lines to the word
> counter.
>
> The spout emits: List<Object> tuple, Object messageId
>
> The message id is the line.
>
> The word normalizer emits: Collection<Tuple> anchors, List<Object> tuple
> and sends ack at the end of the execute method
>
> The word counter sends ack on the input it receives.
>
>
>
> If I kill a worker, this spout does not use any queue, so it won’t be
> resent?
>
> Isn’t this the purpose of the anchors? Do know what to send in case of
> timeout?
>
> Thanks, Michal
>
>
>
>
>
>
>
>
>
> *From:* Michael Rose [mailto:michael@fullcontact.com]
> *Sent:* Monday, December 30, 2013 8:55 AM
>
>
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Guaranteeing message processing on strom fails
>
>
>
> What spout are you using? Guarantees must be enforced by spout. For
> instance, the RabbitMQ spout doesn't ack an AMQP message until ack() is
> called for that message tag (or rejects if fail() is called).
>
>
>
> The spout must pass along a unique message identifier to enable this
> behavior.
>
>
>
> If a tuple goes missing somewhere along the line, fail() will be called
> after a timeout. If you kill the acker that tuple was tracked with, it's
> then up to the message queue or other impl to be able to replay that
> message.
>
>
> Michael Rose (@Xorlev <https://twitter.com/xorlev>)
> Senior Platform Engineer, FullContact <http://www.fullcontact.com/>
> michael@fullcontact.com
>
>
>
> On Sun, Dec 29, 2013 at 11:39 PM, Michal Singer <mi...@leadspace.com>
> wrote:
>
> But what I see is that some of the tuples are not replayed. I print out
> all the tuples and some don’t arrive when I kill a worker process.
>
> Thanks, Michal
>
>
>
> *From:* Michael Rose [mailto:michael@fullcontact.com]
> *Sent:* Sunday, December 29, 2013 7:26 PM
> *To:* user@storm.incubator.apache.org
> *Subject:* Re: Guaranteeing message processing on strom fails
>
>
>
> You are not guaranteed that tuples make it, only that if they go missing
> or processing fails it will replayed from the spout "at least once
> execution"
>
> On Dec 29, 2013 4:47 AM, "Michal Singer" <mi...@leadspace.com> wrote:
>
> Hi, I am trying to test the guaranteeing of messages on storm:
>
>
>
> I have two nodes:
>
> 1.       Node-A contains: supervisor, ui, zookeeper, nimbus
>
> 2.       Node-B contains: supervisor
>
> I have 1 spout and 2 bolts (I am actually running the word counter test)
>
>
>
> I send about 17 messages.
>
> The first bolt is on Node-B
>
> The second bolt is divided to 4 executors which are divided between the
> nodes.
>
>
>
> I kill the worker on node-B and I expect the worker on the other node to
> get the data which was supposed to be sent to Node-B.
>
> But the worker is raised on node-B and the data is sent there accept for
> one touple which is missing.
>
>
>
> 1.       According to the ui – it is very difficult to see what is going
> on cause I guess there are messages resent and it is hard to know exactly
> what happened.
>
> 2.       According to my output – one message is missing which was
> supposed to go to node-B which I killed it’s worker.
>
>
>
> I defined anchors and I sent acks.
>
>
>
> So what am I missing? Why is there data loss?
>
>
>
> Thanks, Michal
>
>
>
>
>
>
>
>
>
>
>
> --
>
> *Abhishek Bhattacharjee*
>
> *Pune Institute of Computer Technology*
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*