You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Komal Mariam <ko...@gmail.com> on 2019/10/08 10:46:12 UTC

Passing parameters to filter function (in DataStreams)

Hi everyone,

Suppose I have to compute a filter condition

Integer threshold = compute threshold();

If I:

temperatureStream.filter(new FilterFunction<temperature>() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature > threshold
}

would this mean I have computed threshold over and over again, for every
new element in the stream?

my threshold does not changes once it computed. I don't want to recompute
it every time for new elements? is there way I can pass it as a parameter
to the filter function?

Re: Passing parameters to filter function (in DataStreams)

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi, 

Your original post looks like "computeThreshold" doesn't require any parameters, but is just an expensive to compute operation. 

In that case, you can inherit from "RichFilterFunction" instead of "FilterFunction". In case of "RichFilterFunction", you can override the "open"-method and perform your operation in there just once and store the result e.g. in a transient variable. In that case, nothing gets serialized and send over the network. The open method is guaranteed to be called only once per operator and is called before the first call to "filter" is made. 

The pattern to pass arguments in general is totally fine. I often pass e.g. a connection String as a parameter to my RichFunction and within the open method of the function, I establish the connection to some remote system. 

Best regards 
Theo 


Von: "Komal Mariam" <ko...@gmail.com> 
An: "Chesnay Schepler" <ch...@apache.org> 
CC: "user" <us...@flink.apache.org> 
Gesendet: Donnerstag, 10. Oktober 2019 04:00:46 
Betreff: Re: Passing parameters to filter function (in DataStreams) 

Thank you @Chesnay! 

I also managed to pass arguments to a RichFilterFunction: new MyFilterFunc(Integer threshold ) by defining its constructor. 
If there's a better way to pass arguments I'd appreciate it if you let me know. 

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler < [ mailto:chesnay@apache.org | chesnay@apache.org ] > wrote: 



You can compute the threshold ahead of time and reference it directly in the filter function. 

(Below are 2 examples, depending on whether you like lambdas or not) 
final int threshold = computeThreshold () ; temperatureStream.filter( new FilterFunction<Integer>() { @Override public boolean filter (Integer temperature) { return temperature > threshold ; }
}) ; 
final int threshold = computeThreshold () ; temperatureStream.filter(temperature -> temperature > threshold ) ; 

On 08/10/2019 12:46, Komal Mariam wrote: 

BQ_BEGIN

Hi everyone, 

Suppose I have to compute a filter condition 

Integer threshold = compute threshold(); 

If I: 

temperatureStream.filter(new FilterFunction<temperature>() { 
@Override 
public boolean filter(Integer temperature) throws Exception { 
Integer threshold = compute threshold(); 
return temperature > threshold 
} 

would this mean I have computed threshold over and over again, for every new element in the stream? 

my threshold does not changes once it computed. I don't want to recompute it every time for new elements? is there way I can pass it as a parameter to the filter function? 







BQ_END


Re: Passing parameters to filter function (in DataStreams)

Posted by Komal Mariam <ko...@gmail.com>.
Thank you @Chesnay!

I also managed to pass arguments to a RichFilterFunction:  new
MyFilterFunc(Integer threshold)  by defining its constructor.
If there's a better way to pass arguments I'd appreciate it if you let me
know.

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler <ch...@apache.org> wrote:

> You can compute the threshold ahead of time and reference it directly in
> the filter function.
>
> (Below are 2 examples, depending on whether you like lambdas or not)
>
> final int threshold = computeThreshold();temperatureStream.filter(new FilterFunction<Integer>() {
>    @Override   public boolean filter(Integer temperature) {
>       return temperature > threshold;   }
> });
>
> final int threshold = computeThreshold();temperatureStream.filter(temperature -> temperature > threshold);
>
>
> On 08/10/2019 12:46, Komal Mariam wrote:
>
> Hi everyone,
>
> Suppose I have to compute a filter condition
>
> Integer threshold = compute threshold();
>
> If I:
>
> temperatureStream.filter(new FilterFunction<temperature>() {
> @Override
> public boolean filter(Integer temperature) throws Exception {
> Integer threshold = compute threshold();
> return temperature > threshold
> }
>
> would this mean I have computed threshold over and over again, for every
> new element in the stream?
>
> my threshold does not changes once it computed. I don't want to recompute
> it every time for new elements? is there way I can pass it as a parameter
> to the filter function?
>
>
>

Re: Passing parameters to filter function (in DataStreams)

Posted by Chesnay Schepler <ch...@apache.org>.
You can compute the threshold ahead of time and reference it directly in 
the filter function.

(Below are 2 examples, depending on whether you like lambdas or not)

final int threshold =computeThreshold(); temperatureStream.filter(new FilterFunction<Integer>() {
    @Override public boolean filter(Integer temperature) {
       return temperature >threshold; }
});

final int threshold =computeThreshold(); temperatureStream.filter(temperature -> temperature >threshold);


On 08/10/2019 12:46, Komal Mariam wrote:
> Hi everyone,
>
> Suppose I have to compute a filter condition
>
> Integer threshold = compute threshold();
>
> If I:
>
> temperatureStream.filter(new FilterFunction<temperature>() {
> @Override
> public boolean filter(Integer temperature) throws Exception {
> Integer threshold = compute threshold();
> return temperature > threshold
> }
>
> would this mean I have computed threshold over and over again, for 
> every new element in the stream?
>
> my threshold does not changes once it computed. I don't want to 
> recompute it every time for new elements? is there way I can pass it 
> as a parameter to the filter function?
>