You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sandeep Samudrala <sa...@gmail.com> on 2017/02/21 09:58:00 UTC

Introduce lag into a topology spout to process the events with a delay.

Hello,
 I have two streams A and B. I need to enrich events coming from stream B
with events coming from A and I store events coming from A in a key-value
store to enrich events from B. Events that doesn't get enriched are sent to
a deferred queue(kafka stream) and are read back later.

Most of the the time the events from Stream B are sent to defer queue
because of bit delay in storing the events into a key-value store from
Stream A and events coming into A and B are almost real time.

I want to introduce a delay into reading into my spout reading from Stream
B so as to make sure higher % of events get enriched in first shot rather
than getting enriched post reading from defer queue. I tried putting a
check on the lag and controlling on the backlog queue to get a hold but
didn't seemed right and would enter into draining and other issues.

Is there a way in the kafka consumer or Storm spout to control the data in
flow to come with delay for processing?

Thanks,
-sandeep.

Re: Introduce lag into a topology spout to process the events with a delay.

Posted by Ankur Garg <an...@gmail.com>.
Thanks Sandeep for explaining the context .I believe Storm does not provide
you any solution for this out of the box (unless u use Trident and process
events in a batch) . I am not much aware of Kafka Spout so cant say. So I
am going to suggest some things which can be done (again may be assuming
things )

It appears (combining all the information) that stream B can be real time
with respect to A or delayed up to 24 Hrs . While I do not know the SLA or
the complete use case , adding a delay or sleep will in average be fine (as
Stream B is already delayed ..should it matter if it gets delayed by a
minute or 2 more ? ) .


*Alternatively , since you are using Normal Spout  , I assume each event is
rather independent to another (as you r processing one event at a time) . *
*Assuming that the only bottleneck here  is ur write speed to key/value
store is not very fast (you have the data for enriching from Stream A ,
just that you cannot write it in time for it to be available for data from
Stream B ) , you may write the event which could not be enriched  back to
the same/different topic instead of sending to the deferred queue . Chances
of the data available in the key/value store can be more .The events which
are enriched can be emitted to different bolt. This doesnt work if the data
which can enrich is not present in Stream A.*

Assuming the above solution is not good , do you know beforehand which
stream is delayed by how much . To elaborate , based on the delay u can
read them in different spouts from different topics in Kafka or process
them in separate Bolts and based on the delay u can introduce sleep/delay
in each bolts .

Though with the info you have provided , I would rather check after each
loop (after enriching B with A  and before writing to deferred queue) the %
of enriching (if it can be computed) and retry enriching till the desired
%age is computed . The rest can be pushed to deferred queue .

Thanks
Ankur

On Thu, 23 Feb 2017 at 14:04 Sandeep Samudrala <sa...@gmail.com> wrote:

Running both streams in same topology doesn't work for me as the stream B
events can come very late up to 24 hours.

Sleep doesn't work as it will slow down the topology, I want the topology
to be running as fast as possible but only with delay so as to ensure
enrichment in the first trial itself.

To give more context. Th events coming from Stream B can come as late as up
to 24 hours and hence I keep the events in a key-value store. I am using
Normal Spout and not Trident as I will be handling event by event(record by
record). I don't think blocking at a tuple level is a good idea as it will
slow down the processing of events.

For now, I am working with a hack to check for the current backlog with the
message header in the kafka event to look for events to be processed with a
delay. Although it works to some extent I am still not able to get it fully
working.

Please let me know If I can add more context.

On Wed, Feb 22, 2017 at 3:23 PM, Ankur Garg <an...@gmail.com> wrote:

Are u processing the events in Storm topology in batch (Trident Spout) or
Normal Spout .

The way I see (this is very trivial and am sure you would have thought
about it)  is if u can introduce sleep in the nextTuple method for Stream B
(in case of Normal Spout) or increasing the value *topology.max.spout.pending
in case of Trident can help you achieve better %age . You can also think of
making nextTuple blocking (although not recommended in general as
everything runs in a single thread so ur ack/fail/emit can get delayed but
I believe it can be fine in your case). *
*Alternatively , since almost both the streams are real time , u could read
from both streams in the same spout and then do enriching instead of
writing the stream A into some key value store and then perform enriching .*

*Obviously , I am making lot of assumptions here since they are not
mentioned in the question and I am not aware of full context of the problem
too . *

*Hope this helps*
*Ankur*

On Wed, 22 Feb 2017 at 11:22 Sandeep Samudrala <sa...@gmail.com> wrote:

Yes. I am reading both the streams from kafka as part of a topology.

On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg <an...@gmail.com> wrote:

Hi Sandeep ,

One question :- how are you reading Streams B and A . Are u reading from
some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
some topology) reading from them . Please confirm .

Thanks
Ankur

On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <sa...@gmail.com> wrote:

Hello,
 I have two streams A and B. I need to enrich events coming from stream B
with events coming from A and I store events coming from A in a key-value
store to enrich events from B. Events that doesn't get enriched are sent to
a deferred queue(kafka stream) and are read back later.

Most of the the time the events from Stream B are sent to defer queue
because of bit delay in storing the events into a key-value store from
Stream A and events coming into A and B are almost real time.

I want to introduce a delay into reading into my spout reading from Stream
B so as to make sure higher % of events get enriched in first shot rather
than getting enriched post reading from defer queue. I tried putting a
check on the lag and controlling on the backlog queue to get a hold but
didn't seemed right and would enter into draining and other issues.

Is there a way in the kafka consumer or Storm spout to control the data in
flow to come with delay for processing?

Thanks,
-sandeep.

Re: Introduce lag into a topology spout to process the events with a delay.

Posted by Sandeep Samudrala <sa...@gmail.com>.
Running both streams in same topology doesn't work for me as the stream B
events can come very late up to 24 hours.

Sleep doesn't work as it will slow down the topology, I want the topology
to be running as fast as possible but only with delay so as to ensure
enrichment in the first trial itself.

To give more context. Th events coming from Stream B can come as late as up
to 24 hours and hence I keep the events in a key-value store. I am using
Normal Spout and not Trident as I will be handling event by event(record by
record). I don't think blocking at a tuple level is a good idea as it will
slow down the processing of events.

For now, I am working with a hack to check for the current backlog with the
message header in the kafka event to look for events to be processed with a
delay. Although it works to some extent I am still not able to get it fully
working.

Please let me know If I can add more context.

On Wed, Feb 22, 2017 at 3:23 PM, Ankur Garg <an...@gmail.com> wrote:

> Are u processing the events in Storm topology in batch (Trident Spout) or
> Normal Spout .
>
> The way I see (this is very trivial and am sure you would have thought
> about it)  is if u can introduce sleep in the nextTuple method for Stream B
> (in case of Normal Spout) or increasing the value *topology.max.spout.pending
> in case of Trident can help you achieve better %age . You can also think of
> making nextTuple blocking (although not recommended in general as
> everything runs in a single thread so ur ack/fail/emit can get delayed but
> I believe it can be fine in your case). *
> *Alternatively , since almost both the streams are real time , u could
> read from both streams in the same spout and then do enriching instead of
> writing the stream A into some key value store and then perform enriching .*
>
> *Obviously , I am making lot of assumptions here since they are not
> mentioned in the question and I am not aware of full context of the problem
> too . *
>
> *Hope this helps*
> *Ankur*
>
> On Wed, 22 Feb 2017 at 11:22 Sandeep Samudrala <sa...@gmail.com>
> wrote:
>
>> Yes. I am reading both the streams from kafka as part of a topology.
>>
>> On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg <an...@gmail.com>
>> wrote:
>>
>> Hi Sandeep ,
>>
>> One question :- how are you reading Streams B and A . Are u reading from
>> some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
>> some topology) reading from them . Please confirm .
>>
>> Thanks
>> Ankur
>>
>> On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <sa...@gmail.com>
>> wrote:
>>
>> Hello,
>>  I have two streams A and B. I need to enrich events coming from stream B
>> with events coming from A and I store events coming from A in a key-value
>> store to enrich events from B. Events that doesn't get enriched are sent to
>> a deferred queue(kafka stream) and are read back later.
>>
>> Most of the the time the events from Stream B are sent to defer queue
>> because of bit delay in storing the events into a key-value store from
>> Stream A and events coming into A and B are almost real time.
>>
>> I want to introduce a delay into reading into my spout reading from
>> Stream B so as to make sure higher % of events get enriched in first shot
>> rather than getting enriched post reading from defer queue. I tried putting
>> a check on the lag and controlling on the backlog queue to get a hold but
>> didn't seemed right and would enter into draining and other issues.
>>
>> Is there a way in the kafka consumer or Storm spout to control the data
>> in flow to come with delay for processing?
>>
>> Thanks,
>> -sandeep.
>>
>>
>>

Re: Introduce lag into a topology spout to process the events with a delay.

Posted by Ankur Garg <an...@gmail.com>.
Are u processing the events in Storm topology in batch (Trident Spout) or
Normal Spout .

The way I see (this is very trivial and am sure you would have thought
about it)  is if u can introduce sleep in the nextTuple method for Stream B
(in case of Normal Spout) or increasing the value *topology.max.spout.pending
in case of Trident can help you achieve better %age . You can also think of
making nextTuple blocking (although not recommended in general as
everything runs in a single thread so ur ack/fail/emit can get delayed but
I believe it can be fine in your case). *
*Alternatively , since almost both the streams are real time , u could read
from both streams in the same spout and then do enriching instead of
writing the stream A into some key value store and then perform enriching .*

*Obviously , I am making lot of assumptions here since they are not
mentioned in the question and I am not aware of full context of the problem
too . *

*Hope this helps*
*Ankur*

On Wed, 22 Feb 2017 at 11:22 Sandeep Samudrala <sa...@gmail.com> wrote:

> Yes. I am reading both the streams from kafka as part of a topology.
>
> On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg <an...@gmail.com> wrote:
>
> Hi Sandeep ,
>
> One question :- how are you reading Streams B and A . Are u reading from
> some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
> some topology) reading from them . Please confirm .
>
> Thanks
> Ankur
>
> On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <sa...@gmail.com>
> wrote:
>
> Hello,
>  I have two streams A and B. I need to enrich events coming from stream B
> with events coming from A and I store events coming from A in a key-value
> store to enrich events from B. Events that doesn't get enriched are sent to
> a deferred queue(kafka stream) and are read back later.
>
> Most of the the time the events from Stream B are sent to defer queue
> because of bit delay in storing the events into a key-value store from
> Stream A and events coming into A and B are almost real time.
>
> I want to introduce a delay into reading into my spout reading from Stream
> B so as to make sure higher % of events get enriched in first shot rather
> than getting enriched post reading from defer queue. I tried putting a
> check on the lag and controlling on the backlog queue to get a hold but
> didn't seemed right and would enter into draining and other issues.
>
> Is there a way in the kafka consumer or Storm spout to control the data in
> flow to come with delay for processing?
>
> Thanks,
> -sandeep.
>
>
>

Re: Introduce lag into a topology spout to process the events with a delay.

Posted by Sandeep Samudrala <sa...@gmail.com>.
Yes. I am reading both the streams from kafka as part of a topology.

On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg <an...@gmail.com> wrote:

> Hi Sandeep ,
>
> One question :- how are you reading Streams B and A . Are u reading from
> some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
> some topology) reading from them . Please confirm .
>
> Thanks
> Ankur
>
> On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <sa...@gmail.com>
> wrote:
>
>> Hello,
>>  I have two streams A and B. I need to enrich events coming from stream B
>> with events coming from A and I store events coming from A in a key-value
>> store to enrich events from B. Events that doesn't get enriched are sent to
>> a deferred queue(kafka stream) and are read back later.
>>
>> Most of the the time the events from Stream B are sent to defer queue
>> because of bit delay in storing the events into a key-value store from
>> Stream A and events coming into A and B are almost real time.
>>
>> I want to introduce a delay into reading into my spout reading from
>> Stream B so as to make sure higher % of events get enriched in first shot
>> rather than getting enriched post reading from defer queue. I tried putting
>> a check on the lag and controlling on the backlog queue to get a hold but
>> didn't seemed right and would enter into draining and other issues.
>>
>> Is there a way in the kafka consumer or Storm spout to control the data
>> in flow to come with delay for processing?
>>
>> Thanks,
>> -sandeep.
>>
>

Re: Introduce lag into a topology spout to process the events with a delay.

Posted by Ankur Garg <an...@gmail.com>.
Hi Sandeep ,

One question :- how are you reading Streams B and A . Are u reading from
some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
some topology) reading from them . Please confirm .

Thanks
Ankur

On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala <sa...@gmail.com> wrote:

> Hello,
>  I have two streams A and B. I need to enrich events coming from stream B
> with events coming from A and I store events coming from A in a key-value
> store to enrich events from B. Events that doesn't get enriched are sent to
> a deferred queue(kafka stream) and are read back later.
>
> Most of the the time the events from Stream B are sent to defer queue
> because of bit delay in storing the events into a key-value store from
> Stream A and events coming into A and B are almost real time.
>
> I want to introduce a delay into reading into my spout reading from Stream
> B so as to make sure higher % of events get enriched in first shot rather
> than getting enriched post reading from defer queue. I tried putting a
> check on the lag and controlling on the backlog queue to get a hold but
> didn't seemed right and would enter into draining and other issues.
>
> Is there a way in the kafka consumer or Storm spout to control the data in
> flow to come with delay for processing?
>
> Thanks,
> -sandeep.
>