You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jaswin Shah <ja...@outlook.com> on 2020/06/11 11:08:15 UTC

Filter function in flink

Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.
[cid:1bf23fce-5201-4317-9986-8d82db112b72]

Re: Filter function in flink

Posted by Chesnay Schepler <ch...@apache.org>.
Flink does some optimizations where multiple operations (like maps, 
filters) are deployed as one operations, since it is more efficient to 
directly all functions after another instead of transmitting the 
intermediate data.

This is referred to as "chaining", and the DAG only shows these combined 
operations.

Here's another example that shows the original stream staying intact:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<Integer> originalInput = env.fromElements(1, 2, 3); 
DataStream<Integer> ones = originalInput.filter(value -> value == 1); 
DataStream<Integer> twos = originalInput.filter(value -> value == 2); 
ones.map(value -> "1s: " + value).print(); twos.map(value -> "2s: " + 
value).print();
originalInput.map(value -> "original: " + 
value).startNewChain().print();env.execute();

Output:

original: 1
original: 2
original: 3
1s: 1
2s: 2

You  can disable chaining by calling startNewChain() on a DataStream, 
but this is /usually /only done for demonstration purposes since it 
impedes performance.

ones.map(value ->"1s: " + value).startNewChain().print(); twos.map(value ->"2s: " + value).startNewChain().print();
originalInput.map(value ->"original: " + value).startNewChain().print();

On 02/07/2020 18:26, Jaswin Shah wrote:
> Hi Chesnay,
> Thanks for responding. So, according to you filter function should 
> create a new filtered stream and does not update the original stream. 
> But, in DAG why it does not show them as different branches if this is 
> the case? Are you sure on this that filter operation does not change 
> the original stream and create new stream .
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* 11 June 2020 17:29
> *To:* Jaswin Shah <ja...@outlook.com>; user@flink.apache.org 
> <us...@flink.apache.org>
> *Subject:* Re: Filter function in flink
> originalStream = ...
> filteredStream = originalStream.filter(filterA)
> differentelyFilteredStream = originalStream.filter(filterB)
>
> originalStream.map(<this works on the original stream>)
> filteredStream.map(<this works on the filtered stream>)
> differentelyFilteredStream .map(<this works on the differently 
> filtered stream>)
>
> On 11/06/2020 13:08, Jaswin Shah wrote:
>> Hi,
>>
>> Filter function on datastream updates on the same datastream rather 
>> than creating new stream and keeping the datastream on which filter 
>> function is applyed intact. My usecase is I am applying different 
>> filters on resultstream and then process the filteredstream with 
>> different processes but, as per DAG it seems like it's doing all 
>> filter changes in existing original datastream.
>>
>> Can anyone help me on how to achieve this?
>> like, here I want to apply diff filters individually before each 
>> keyedProcess as in below DAG, BUT all filters should produce new 
>> stream rather than updating existing datastream.
>>
>


Re: Filter function in flink

Posted by Jaswin Shah <ja...@outlook.com>.
Hi Chesnay,
Thanks for responding. So, according to you filter function should create a new filtered stream and does not update the original stream. But, in DAG why it does not show them as different branches if this is the case? Are you sure on this that filter operation does not change the original stream and create new stream .
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: 11 June 2020 17:29
To: Jaswin Shah <ja...@outlook.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Filter function in flink

originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)

originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)

On 11/06/2020 13:08, Jaswin Shah wrote:
Hi,

Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.

Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.
[cid:part1.2A1C37F2.74986286@apache.org]


Re: Filter function in flink

Posted by Chesnay Schepler <ch...@apache.org>.
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)

originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered 
stream>)

On 11/06/2020 13:08, Jaswin Shah wrote:
> Hi,
>
> Filter function on datastream updates on the same datastream rather 
> than creating new stream and keeping the datastream on which filter 
> function is applyed intact. My usecase is I am applying different 
> filters on resultstream and then process the filteredstream with 
> different processes but, as per DAG it seems like it's doing all 
> filter changes in existing original datastream.
>
> Can anyone help me on how to achieve this?
> like, here I want to apply diff filters individually before each 
> keyedProcess as in below DAG, BUT all filters should produce new 
> stream rather than updating existing datastream.
>