You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Akshay Shinde <ak...@oracle.com> on 2020/02/12 02:34:29 UTC

Dedup all data in stream

Hi Community 

In our Flink job, in source we are creating our own stream to process n number of objects per 2 minutes. And in process function for each object from generated source stream we are doing some operation which we expect to get finished in 2 minutes.

Every 2 minutes we are generating same ’N’ objects in stream which process function will process.  But in some cases process function is taking longer time around 10 minutes. In this case stream will have 5 number of sets for ’N’ objects as process function is waiting for 10 minutes as source is adding ’N’ objects in stream at every 2 minutes. Problem is we don’t want to process these objects 5 times, we want it to process only once for the latest ’N’ objects.   

This lag can be more or less from process function which results in lag from source to process function in job execution.


Thanks in advance !!!

Re: Dedup all data in stream

Posted by Kostas Kloudas <kk...@apache.org>.
Hi Akshay,

Is your usecase that the input stream consists of metrics from these
1000s of resources, the ProcessFunction aggregates
them in windows of 2min and does some analysis on these metrics and
this analysis may take more than 2 min so you create backpressure to
the source?

If this case, if the metric records are timestamped, then you can use
event time and you will have your metrics in the correct timestamp
order
and eventually your stream will catch up (assuming that you have
enough resources - parallelism).
If you want this analysis of the incoming metrics to be performed by
another thread and while this is happening
ignore any other incoming records, then you should look towards the
direction AsyncIO that I posted previously.

This will guarantee that you will have fault tolerance and
asynchronous processing.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 6:33 PM Akshay Shinde <ak...@oracle.com> wrote:
>
> Hi Kostas
>
> We are doing scans on 1000s of resources which we want to do it at some interval which is currently 2 mins. Scanning is the same operation we want to perform at every 2 minutes to check if everything is ok or not. Sometimes this scan operation takes lot of time which results in lag and in stream (which we produce from source function) we are getting multiple sets of data for same 1000s of resources. At this time we are okay if perform scan operation only once for all the set that are present currently in stream.
>
> Parallelism for source function is 1 and for Process function its currently 2.
>
> Thanks for the response.
>
> —
> Akshay
>
> > On Feb 12, 2020, at 2:07 AM, Kostas Kloudas <kk...@apache.org> wrote:
> >
> > Hi Akshay,
> >
> > Could you be more specific on what you are trying to achieve with this scheme?
> >
> > I am asking because if your source is too fast and you want it to slow
> > it down so that it produces data at the same rate as your process
> > function can consume them, then Flink's backpressure will eventually
> > do this.
> >
> > If you want your process function to discard incoming elements (and
> > not queue them) if it is in the process of processing another element,
> > then this implies a multithreaded process function and I would look
> > maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
> > setting a flag as busy while processing and as false when it is done
> > and ready to process the next element.
> >
> > Also, in order to help, I would need more information about the stream
> > being keyed or non-keyed and the parallelism of the source compared to
> > that of the process function.
> >
> > I hope this helps,
> > Kostas
> >
> > [1] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html__;!!GqivPVa7Brio!NE2BACN8q5265oyZUvBg44u3mq7sGn96u3rPtcVbFq9DovpIa1KtilsCXW3mtYofoQw$
> >
> > On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde <ak...@oracle.com> wrote:
> >>
> >> Hi Community
> >>
> >> In our Flink job, in source we are creating our own stream to process n number of objects per 2 minutes. And in process function for each object from generated source stream we are doing some operation which we expect to get finished in 2 minutes.
> >>
> >> Every 2 minutes we are generating same ’N’ objects in stream which process function will process.  But in some cases process function is taking longer time around 10 minutes. In this case stream will have 5 number of sets for ’N’ objects as process function is waiting for 10 minutes as source is adding ’N’ objects in stream at every 2 minutes. Problem is we don’t want to process these objects 5 times, we want it to process only once for the latest ’N’ objects.
> >>
> >> This lag can be more or less from process function which results in lag from source to process function in job execution.
> >>
> >>
> >> Thanks in advance !!!
>

Re: Dedup all data in stream

Posted by Akshay Shinde <ak...@oracle.com>.
Hi Kostas

We are doing scans on 1000s of resources which we want to do it at some interval which is currently 2 mins. Scanning is the same operation we want to perform at every 2 minutes to check if everything is ok or not. Sometimes this scan operation takes lot of time which results in lag and in stream (which we produce from source function) we are getting multiple sets of data for same 1000s of resources. At this time we are okay if perform scan operation only once for all the set that are present currently in stream.

Parallelism for source function is 1 and for Process function its currently 2.

Thanks for the response.

—
Akshay   

> On Feb 12, 2020, at 2:07 AM, Kostas Kloudas <kk...@apache.org> wrote:
> 
> Hi Akshay,
> 
> Could you be more specific on what you are trying to achieve with this scheme?
> 
> I am asking because if your source is too fast and you want it to slow
> it down so that it produces data at the same rate as your process
> function can consume them, then Flink's backpressure will eventually
> do this.
> 
> If you want your process function to discard incoming elements (and
> not queue them) if it is in the process of processing another element,
> then this implies a multithreaded process function and I would look
> maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
> setting a flag as busy while processing and as false when it is done
> and ready to process the next element.
> 
> Also, in order to help, I would need more information about the stream
> being keyed or non-keyed and the parallelism of the source compared to
> that of the process function.
> 
> I hope this helps,
> Kostas
> 
> [1] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html__;!!GqivPVa7Brio!NE2BACN8q5265oyZUvBg44u3mq7sGn96u3rPtcVbFq9DovpIa1KtilsCXW3mtYofoQw$ 
> 
> On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde <ak...@oracle.com> wrote:
>> 
>> Hi Community
>> 
>> In our Flink job, in source we are creating our own stream to process n number of objects per 2 minutes. And in process function for each object from generated source stream we are doing some operation which we expect to get finished in 2 minutes.
>> 
>> Every 2 minutes we are generating same ’N’ objects in stream which process function will process.  But in some cases process function is taking longer time around 10 minutes. In this case stream will have 5 number of sets for ’N’ objects as process function is waiting for 10 minutes as source is adding ’N’ objects in stream at every 2 minutes. Problem is we don’t want to process these objects 5 times, we want it to process only once for the latest ’N’ objects.
>> 
>> This lag can be more or less from process function which results in lag from source to process function in job execution.
>> 
>> 
>> Thanks in advance !!!


Re: Dedup all data in stream

Posted by Kostas Kloudas <kk...@apache.org>.
Hi Akshay,

Could you be more specific on what you are trying to achieve with this scheme?

I am asking because if your source is too fast and you want it to slow
it down so that it produces data at the same rate as your process
function can consume them, then Flink's backpressure will eventually
do this.

If you want your process function to discard incoming elements (and
not queue them) if it is in the process of processing another element,
then this implies a multithreaded process function and I would look
maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
setting a flag as busy while processing and as false when it is done
and ready to process the next element.

Also, in order to help, I would need more information about the stream
being keyed or non-keyed and the parallelism of the source compared to
that of the process function.

I hope this helps,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde <ak...@oracle.com> wrote:
>
> Hi Community
>
> In our Flink job, in source we are creating our own stream to process n number of objects per 2 minutes. And in process function for each object from generated source stream we are doing some operation which we expect to get finished in 2 minutes.
>
> Every 2 minutes we are generating same ’N’ objects in stream which process function will process.  But in some cases process function is taking longer time around 10 minutes. In this case stream will have 5 number of sets for ’N’ objects as process function is waiting for 10 minutes as source is adding ’N’ objects in stream at every 2 minutes. Problem is we don’t want to process these objects 5 times, we want it to process only once for the latest ’N’ objects.
>
> This lag can be more or less from process function which results in lag from source to process function in job execution.
>
>
> Thanks in advance !!!