You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jaswin Shah <ja...@outlook.com> on 2020/06/11 11:08:15 UTC
Filter function in flink
Hi,
Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.
Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.
[cid:1bf23fce-5201-4317-9986-8d82db112b72]
Re: Filter function in flink
Posted by Chesnay Schepler <ch...@apache.org>.
Flink does some optimizations where multiple operations (like maps,
filters) are deployed as one operations, since it is more efficient to
directly all functions after another instead of transmitting the
intermediate data.
This is referred to as "chaining", and the DAG only shows these combined
operations.
Here's another example that shows the original stream staying intact:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> originalInput = env.fromElements(1, 2, 3);
DataStream<Integer> ones = originalInput.filter(value -> value == 1);
DataStream<Integer> twos = originalInput.filter(value -> value == 2);
ones.map(value -> "1s: " + value).print(); twos.map(value -> "2s: " +
value).print();
originalInput.map(value -> "original: " +
value).startNewChain().print();env.execute();
Output:
original: 1
original: 2
original: 3
1s: 1
2s: 2
You can disable chaining by calling startNewChain() on a DataStream,
but this is /usually /only done for demonstration purposes since it
impedes performance.
ones.map(value ->"1s: " + value).startNewChain().print(); twos.map(value ->"2s: " + value).startNewChain().print();
originalInput.map(value ->"original: " + value).startNewChain().print();
On 02/07/2020 18:26, Jaswin Shah wrote:
> Hi Chesnay,
> Thanks for responding. So, according to you filter function should
> create a new filtered stream and does not update the original stream.
> But, in DAG why it does not show them as different branches if this is
> the case? Are you sure on this that filter operation does not change
> the original stream and create new stream .
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* 11 June 2020 17:29
> *To:* Jaswin Shah <ja...@outlook.com>; user@flink.apache.org
> <us...@flink.apache.org>
> *Subject:* Re: Filter function in flink
> originalStream = ...
> filteredStream = originalStream.filter(filterA)
> differentelyFilteredStream = originalStream.filter(filterB)
>
> originalStream.map(<this works on the original stream>)
> filteredStream.map(<this works on the filtered stream>)
> differentelyFilteredStream .map(<this works on the differently
> filtered stream>)
>
> On 11/06/2020 13:08, Jaswin Shah wrote:
>> Hi,
>>
>> Filter function on datastream updates on the same datastream rather
>> than creating new stream and keeping the datastream on which filter
>> function is applyed intact. My usecase is I am applying different
>> filters on resultstream and then process the filteredstream with
>> different processes but, as per DAG it seems like it's doing all
>> filter changes in existing original datastream.
>>
>> Can anyone help me on how to achieve this?
>> like, here I want to apply diff filters individually before each
>> keyedProcess as in below DAG, BUT all filters should produce new
>> stream rather than updating existing datastream.
>>
>
Re: Filter function in flink
Posted by Jaswin Shah <ja...@outlook.com>.
Hi Chesnay,
Thanks for responding. So, according to you filter function should create a new filtered stream and does not update the original stream. But, in DAG why it does not show them as different branches if this is the case? Are you sure on this that filter operation does not change the original stream and create new stream .
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: 11 June 2020 17:29
To: Jaswin Shah <ja...@outlook.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: Filter function in flink
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)
originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered stream>)
On 11/06/2020 13:08, Jaswin Shah wrote:
Hi,
Filter function on datastream updates on the same datastream rather than creating new stream and keeping the datastream on which filter function is applyed intact. My usecase is I am applying different filters on resultstream and then process the filteredstream with different processes but, as per DAG it seems like it's doing all filter changes in existing original datastream.
Can anyone help me on how to achieve this?
like, here I want to apply diff filters individually before each keyedProcess as in below DAG, BUT all filters should produce new stream rather than updating existing datastream.
[cid:part1.2A1C37F2.74986286@apache.org]
Re: Filter function in flink
Posted by Chesnay Schepler <ch...@apache.org>.
originalStream = ...
filteredStream = originalStream.filter(filterA)
differentelyFilteredStream = originalStream.filter(filterB)
originalStream.map(<this works on the original stream>)
filteredStream.map(<this works on the filtered stream>)
differentelyFilteredStream .map(<this works on the differently filtered
stream>)
On 11/06/2020 13:08, Jaswin Shah wrote:
> Hi,
>
> Filter function on datastream updates on the same datastream rather
> than creating new stream and keeping the datastream on which filter
> function is applyed intact. My usecase is I am applying different
> filters on resultstream and then process the filteredstream with
> different processes but, as per DAG it seems like it's doing all
> filter changes in existing original datastream.
>
> Can anyone help me on how to achieve this?
> like, here I want to apply diff filters individually before each
> keyedProcess as in below DAG, BUT all filters should produce new
> stream rather than updating existing datastream.
>