You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Benchao Li <li...@apache.org> on 2020/07/23 10:31:17 UTC

[DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

Hi all,

I'd like to discuss about the semantic of returning null from
InputFormat#nextRecord.

For now, there is no explicit java doc about this. And there are three ways
to handle
this in Flink:
1. treat null as the end of input
2. skip null
3. assumes that the value from InputFormat#nextRecord cannot be null

I quickly searched in Flink codebase about these usage:
- org.apache.flink.api.common.operators.GenericDataSourceBase [2]
- org.apache.flink.api.java.io.CsvInputFormat [2]
- org.apache.flink.runtime.operators.DataSourceTask [2]
-
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
[2]
- org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
[1]
- org.apache.flink.table.sources.CsvTableSource [1]
- org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
- org.apache.flink.table.filesystem.FileSystemLookupFunction [3]

I think we can align these behavior. about the alignment, I personally lean
to #2

A step further, when will InputFormat#nextRecord returns null?
One scenario is that we encountered dirty data, and want to skip it.
Actually we face the same problem in
org.apache.flink.api.common.serialization.DeserializationSchema
in the past, and in 1.11 we added a method `void deserialize(byte[]
message, Collector<T> out)`.
It's default behavior is to ignore the null return value.

Then could we also add a method `void nextRecord(OT reuse, Collector<OT>
collector)`
in InputFormat?
Adding this method will enable us to return multi records in one call,
which is very flexible for implementing an InputFormat.

WDHY?

-- 

Best,
Benchao Li

Re: [DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

Posted by Benchao Li <li...@apache.org>.
Hi Jingsong,

Thanks for the input. Our current implementation is very like the way you
posted.

Btw, we are doing the unification of streaming/batch work using our Flink
SQL engine,
hope to contribute more in this realm.

Jingsong Li <ji...@gmail.com> 于2020年7月30日周四 上午11:49写道:

> Hi Benchao,
>
> I'm very glad that you are developing the ecology of batch~
>
> More detail for #1 is in `CsvInputFormat`:
> - There are dirty records (Comments) in `CsvInputFormat.readRecord`, so
> unfortunately, it must return null.
> - So go on like this, will return null in `nextRecord` like #2, so
> `CsvInputFormat` overwrite `nextRecord` like this:
>
> @Override
> public OUT nextRecord(OUT record) throws IOException {
>    OUT returnRecord = null;
>    do {
>       returnRecord = super.nextRecord(record);
>    } while (returnRecord == null && !reachedEnd());
>
>    return returnRecord;
> }
>
>
> Best,
> Jingsong
>
> On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <li...@apache.org> wrote:
>
> > Thanks Jingsong for the response.
> >
> > If we plan to deprecate InputFormat in the near future, I think it's ok
> to
> > keep current behavior
> > as it is for now.
> >
> > I raised this discussion because I found that InputFormatSourceFunction
> is
> > not working as expected
> > while I'm developing a batch table source internally.
> > Our batch table source reads raw data from our internal storage, which
> has
> > the same interface as Kafka,
> > then we can leverage all the formats in streaming, e.g. json/pb. So it's
> > common that we encounters dirty
> > data and want to skip it like in DeserializationSchema.
> > That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat
> in
> > the near future, I think there
> > is no need to change this in the community.
> >
> >
> > Jingsong Li <ji...@gmail.com> 于2020年7月30日周四 上午10:30写道:
> >
> > > Hi Benchao,
> > >
> > > My understanding is that #1 treats null as the end of input.
> > > This means we should try our best to avoid returning null before the
> end
> > of
> > > input in the implementation of `InputFormat`. Even if we have to return
> > > null, we have to return it at the end of input.
> > > I think this is the most safe implementation.
> > > For #1 and #2, it works.
> > > For #3, I think we should avoid #3, actually, both
> > `CRowValuesInputFormat`
> > > and `FileSystemLookupFunction` have corresponding implementations, the
> > > specific `InputFormat`s can ensure they have nice behavior about
> > > `reachedEnd` and `nextRecord`. Of course, we can also make their
> > > invocations more robust.
> > >
> > > Actually, IIUC, the InputFormat is a legacy interface, the new
> interface
> > > should be FLIP-27.[1] And we are planning implementing FileSource on
> the
> > > new interfaces too.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <li...@apache.org>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss about the semantic of returning null from
> > > > InputFormat#nextRecord.
> > > >
> > > > For now, there is no explicit java doc about this. And there are
> three
> > > ways
> > > > to handle
> > > > this in Flink:
> > > > 1. treat null as the end of input
> > > > 2. skip null
> > > > 3. assumes that the value from InputFormat#nextRecord cannot be null
> > > >
> > > > I quickly searched in Flink codebase about these usage:
> > > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> > > > - org.apache.flink.api.java.io.CsvInputFormat [2]
> > > > - org.apache.flink.runtime.operators.DataSourceTask [2]
> > > > -
> > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> > > > [2]
> > > > -
> > >
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> > > > [1]
> > > > - org.apache.flink.table.sources.CsvTableSource [1]
> > > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> > > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
> > > >
> > > > I think we can align these behavior. about the alignment, I
> personally
> > > lean
> > > > to #2
> > > >
> > > > A step further, when will InputFormat#nextRecord returns null?
> > > > One scenario is that we encountered dirty data, and want to skip it.
> > > > Actually we face the same problem in
> > > > org.apache.flink.api.common.serialization.DeserializationSchema
> > > > in the past, and in 1.11 we added a method `void deserialize(byte[]
> > > > message, Collector<T> out)`.
> > > > It's default behavior is to ignore the null return value.
> > > >
> > > > Then could we also add a method `void nextRecord(OT reuse,
> > Collector<OT>
> > > > collector)`
> > > > in InputFormat?
> > > > Adding this method will enable us to return multi records in one
> call,
> > > > which is very flexible for implementing an InputFormat.
> > > >
> > > > WDHY?
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
> Best, Jingsong Lee
>


-- 

Best,
Benchao Li

Re: [DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

Posted by Jingsong Li <ji...@gmail.com>.
Hi Benchao,

I'm very glad that you are developing the ecology of batch~

More detail for #1 is in `CsvInputFormat`:
- There are dirty records (Comments) in `CsvInputFormat.readRecord`, so
unfortunately, it must return null.
- So go on like this, will return null in `nextRecord` like #2, so
`CsvInputFormat` overwrite `nextRecord` like this:

@Override
public OUT nextRecord(OUT record) throws IOException {
   OUT returnRecord = null;
   do {
      returnRecord = super.nextRecord(record);
   } while (returnRecord == null && !reachedEnd());

   return returnRecord;
}


Best,
Jingsong

On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <li...@apache.org> wrote:

> Thanks Jingsong for the response.
>
> If we plan to deprecate InputFormat in the near future, I think it's ok to
> keep current behavior
> as it is for now.
>
> I raised this discussion because I found that InputFormatSourceFunction is
> not working as expected
> while I'm developing a batch table source internally.
> Our batch table source reads raw data from our internal storage, which has
> the same interface as Kafka,
> then we can leverage all the formats in streaming, e.g. json/pb. So it's
> common that we encounters dirty
> data and want to skip it like in DeserializationSchema.
> That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat in
> the near future, I think there
> is no need to change this in the community.
>
>
> Jingsong Li <ji...@gmail.com> 于2020年7月30日周四 上午10:30写道:
>
> > Hi Benchao,
> >
> > My understanding is that #1 treats null as the end of input.
> > This means we should try our best to avoid returning null before the end
> of
> > input in the implementation of `InputFormat`. Even if we have to return
> > null, we have to return it at the end of input.
> > I think this is the most safe implementation.
> > For #1 and #2, it works.
> > For #3, I think we should avoid #3, actually, both
> `CRowValuesInputFormat`
> > and `FileSystemLookupFunction` have corresponding implementations, the
> > specific `InputFormat`s can ensure they have nice behavior about
> > `reachedEnd` and `nextRecord`. Of course, we can also make their
> > invocations more robust.
> >
> > Actually, IIUC, the InputFormat is a legacy interface, the new interface
> > should be FLIP-27.[1] And we are planning implementing FileSource on the
> > new interfaces too.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > Best,
> > Jingsong
> >
> > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <li...@apache.org> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss about the semantic of returning null from
> > > InputFormat#nextRecord.
> > >
> > > For now, there is no explicit java doc about this. And there are three
> > ways
> > > to handle
> > > this in Flink:
> > > 1. treat null as the end of input
> > > 2. skip null
> > > 3. assumes that the value from InputFormat#nextRecord cannot be null
> > >
> > > I quickly searched in Flink codebase about these usage:
> > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> > > - org.apache.flink.api.java.io.CsvInputFormat [2]
> > > - org.apache.flink.runtime.operators.DataSourceTask [2]
> > > -
> > >
> > >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> > > [2]
> > > -
> > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> > > [1]
> > > - org.apache.flink.table.sources.CsvTableSource [1]
> > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
> > >
> > > I think we can align these behavior. about the alignment, I personally
> > lean
> > > to #2
> > >
> > > A step further, when will InputFormat#nextRecord returns null?
> > > One scenario is that we encountered dirty data, and want to skip it.
> > > Actually we face the same problem in
> > > org.apache.flink.api.common.serialization.DeserializationSchema
> > > in the past, and in 1.11 we added a method `void deserialize(byte[]
> > > message, Collector<T> out)`.
> > > It's default behavior is to ignore the null return value.
> > >
> > > Then could we also add a method `void nextRecord(OT reuse,
> Collector<OT>
> > > collector)`
> > > in InputFormat?
> > > Adding this method will enable us to return multi records in one call,
> > > which is very flexible for implementing an InputFormat.
> > >
> > > WDHY?
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
Best, Jingsong Lee

Re: [DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

Posted by Benchao Li <li...@apache.org>.
Thanks Jingsong for the response.

If we plan to deprecate InputFormat in the near future, I think it's ok to
keep current behavior
as it is for now.

I raised this discussion because I found that InputFormatSourceFunction is
not working as expected
while I'm developing a batch table source internally.
Our batch table source reads raw data from our internal storage, which has
the same interface as Kafka,
then we can leverage all the formats in streaming, e.g. json/pb. So it's
common that we encounters dirty
data and want to skip it like in DeserializationSchema.
That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat in
the near future, I think there
is no need to change this in the community.


Jingsong Li <ji...@gmail.com> 于2020年7月30日周四 上午10:30写道:

> Hi Benchao,
>
> My understanding is that #1 treats null as the end of input.
> This means we should try our best to avoid returning null before the end of
> input in the implementation of `InputFormat`. Even if we have to return
> null, we have to return it at the end of input.
> I think this is the most safe implementation.
> For #1 and #2, it works.
> For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat`
> and `FileSystemLookupFunction` have corresponding implementations, the
> specific `InputFormat`s can ensure they have nice behavior about
> `reachedEnd` and `nextRecord`. Of course, we can also make their
> invocations more robust.
>
> Actually, IIUC, the InputFormat is a legacy interface, the new interface
> should be FLIP-27.[1] And we are planning implementing FileSource on the
> new interfaces too.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> Best,
> Jingsong
>
> On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <li...@apache.org> wrote:
>
> > Hi all,
> >
> > I'd like to discuss about the semantic of returning null from
> > InputFormat#nextRecord.
> >
> > For now, there is no explicit java doc about this. And there are three
> ways
> > to handle
> > this in Flink:
> > 1. treat null as the end of input
> > 2. skip null
> > 3. assumes that the value from InputFormat#nextRecord cannot be null
> >
> > I quickly searched in Flink codebase about these usage:
> > - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> > - org.apache.flink.api.java.io.CsvInputFormat [2]
> > - org.apache.flink.runtime.operators.DataSourceTask [2]
> > -
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> > [2]
> > -
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> > [1]
> > - org.apache.flink.table.sources.CsvTableSource [1]
> > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
> >
> > I think we can align these behavior. about the alignment, I personally
> lean
> > to #2
> >
> > A step further, when will InputFormat#nextRecord returns null?
> > One scenario is that we encountered dirty data, and want to skip it.
> > Actually we face the same problem in
> > org.apache.flink.api.common.serialization.DeserializationSchema
> > in the past, and in 1.11 we added a method `void deserialize(byte[]
> > message, Collector<T> out)`.
> > It's default behavior is to ignore the null return value.
> >
> > Then could we also add a method `void nextRecord(OT reuse, Collector<OT>
> > collector)`
> > in InputFormat?
> > Adding this method will enable us to return multi records in one call,
> > which is very flexible for implementing an InputFormat.
> >
> > WDHY?
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
> Best, Jingsong Lee
>


-- 

Best,
Benchao Li

Re: [DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

Posted by Jingsong Li <ji...@gmail.com>.
Hi Benchao,

My understanding is that #1 treats null as the end of input.
This means we should try our best to avoid returning null before the end of
input in the implementation of `InputFormat`. Even if we have to return
null, we have to return it at the end of input.
I think this is the most safe implementation.
For #1 and #2, it works.
For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat`
and `FileSystemLookupFunction` have corresponding implementations, the
specific `InputFormat`s can ensure they have nice behavior about
`reachedEnd` and `nextRecord`. Of course, we can also make their
invocations more robust.

Actually, IIUC, the InputFormat is a legacy interface, the new interface
should be FLIP-27.[1] And we are planning implementing FileSource on the
new interfaces too.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Best,
Jingsong

On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <li...@apache.org> wrote:

> Hi all,
>
> I'd like to discuss about the semantic of returning null from
> InputFormat#nextRecord.
>
> For now, there is no explicit java doc about this. And there are three ways
> to handle
> this in Flink:
> 1. treat null as the end of input
> 2. skip null
> 3. assumes that the value from InputFormat#nextRecord cannot be null
>
> I quickly searched in Flink codebase about these usage:
> - org.apache.flink.api.common.operators.GenericDataSourceBase [2]
> - org.apache.flink.api.java.io.CsvInputFormat [2]
> - org.apache.flink.runtime.operators.DataSourceTask [2]
> -
>
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
> [2]
> - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
> [1]
> - org.apache.flink.table.sources.CsvTableSource [1]
> - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
> - org.apache.flink.table.filesystem.FileSystemLookupFunction [3]
>
> I think we can align these behavior. about the alignment, I personally lean
> to #2
>
> A step further, when will InputFormat#nextRecord returns null?
> One scenario is that we encountered dirty data, and want to skip it.
> Actually we face the same problem in
> org.apache.flink.api.common.serialization.DeserializationSchema
> in the past, and in 1.11 we added a method `void deserialize(byte[]
> message, Collector<T> out)`.
> It's default behavior is to ignore the null return value.
>
> Then could we also add a method `void nextRecord(OT reuse, Collector<OT>
> collector)`
> in InputFormat?
> Adding this method will enable us to return multi records in one call,
> which is very flexible for implementing an InputFormat.
>
> WDHY?
>
> --
>
> Best,
> Benchao Li
>


-- 
Best, Jingsong Lee