You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roey Shem Tov (Jira)" <ji...@apache.org> on 2020/07/20 17:01:00 UTC

[jira] [Comment Edited] (FLINK-18627) Get unmatch filter method records to side output

    [ https://issues.apache.org/jira/browse/FLINK-18627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161377#comment-17161377 ] 

Roey Shem Tov edited comment on FLINK-18627 at 7/20/20, 5:00 PM:
-----------------------------------------------------------------

Hey [~sjwiesman] ,

Well you right, it should be Improvment instead New Feature I`ll fix it,
 My issue today is that you have DataStream, and you want apply on it multiple filters functions for example(And for this example the filter functions will be lambda).

For example:
{code:java}
datastream
.filter(i->i%2==0)
.filter(i->i%3==0)
.filter(i->i%4==0)
.filter(i->i%5==0)  {code}
There is no easy way to get all the filtered records into single side output(or multiple side output),
 My new "Improvment" is that filter will return FilteredDataStream/FilteredSingleOutputStreamOperator (instead DataStream/SingeOutputStreamOperator) and after filter you will can get the FilteredRecords into side output easily, example:

 

{{}}
{code:java}
final OutputTag<String> curruptedData = new OutputTag<Integer>("side-output"){};

datastream
.filter(i->i%2==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%3==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%4==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%5==0).sideOutFilteredRecords(curruptedData)
{code}
{{}}

 

This way i can get easily all the "currupted data" or data that been filtered into single/multiple side outputs, instead of today way (That much more harder) that will be to implement RichFilterFunction and do something like that:
{code:java}
if (statement) {
ctx.collect(record)
}
else {
ctx.output(curruptedData,record)
}{code}
1. Does my explain is good enough? did you understand what the Improvment Im trying to achieve?
 2. I really think it is NTH Improvment that can really make things much more easy and be useful, what do you think?

{{}}


was (Author: roeyshemtov):
Hey [~sjwiesman] ,

Well you right, it should be Improvment instead New Feature I`ll fix it,
My issue today is that you have DataStream, and you want apply on it multiple filters functions for example(And for this example the filter functions will be lambda).

For example:
{code:java}
datastream
.filter(i->i%2==0)
.filter(i->i%3==0)
.filter(i->i%4==0)
.filter(i->i%5==0)  {code}
There is no easy way to get all the filtered records into single side output(or multiply side output),
My new "Improvment" is that filter will return FilteredDataStream/FilteredSingleOutputStreamOperator (instead DataStream/SingeOutputStreamOperator) and after filter you will can get the FilteredRecords into side output easily, example:



 

{{}}
{code:java}
final OutputTag<String> curruptedData = new OutputTag<Integer>("side-output"){};

datastream
.filter(i->i%2==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%3==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%4==0).sideOutFilteredRecords(curruptedData)
.filter(i->i%5==0).sideOutFilteredRecords(curruptedData)
{code}
{{}}

 

This way i can get easily all the "currupted data" or data that been filtered into single/multiple side outputs, instead of today way (That much more harder) that will be to implement RichFilterFunction and do something like that:


{code:java}
if (statement) {
ctx.collect(record)
}
else {
ctx.output(curruptedData,record)
}{code}
1. Does my explain is good enough? did you understand what the Improvment Im trying to achieve?
2. I really think it is NTH Improvment that can really make things much more easy and be useful, what do you think?

{{}}

> Get unmatch filter method records to side output
> ------------------------------------------------
>
>                 Key: FLINK-18627
>                 URL: https://issues.apache.org/jira/browse/FLINK-18627
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>            Reporter: Roey Shem Tov
>            Priority: Major
>             Fix For: 1.12.0
>
>
> Unmatch records to filter functions should send somehow to side output.
> Example:
>  
> {code:java}
> datastream
> .filter(i->i%2==0)
> .sideOutput(oddNumbersSideOutput);
> {code}
>  
>  
> That's way we can filter multiple times and send the filtered records to our side output instead of dropping it immediatly, it can be useful in many ways.
>  
> What do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)