You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Péter Váry <pe...@gmail.com> on 2023/04/19 12:26:34 UTC

DefaultInputSplitAssigner question

Hi Team,

Recently I ran into the DefaultInputSplitAssigner [1].
The javadoc documentation states:


/**

 * This is the default implementation of the {@link InputSplitAssigner}
interface. The default input

 * split assigner simply returns all input splits of an input vertex *in
the order they were*

* * originally computed*.

 */

The highlighted part says the order of the elements are kept and we wanted
to rely on this.

OTOH the code does this:


        synchronized (this.splits) {

            if (this.splits.size() > 0) {

                next = this.splits.remove(this.splits.size() - 1);

            }

        }

Which is exactly opposite of the stated intent and returns the splits in
the reverse order :D
The change got committed almost a decade ago [2]

I can see 2 solutions here:
- Change the comment by removing the restrictions for the ordering of the
splits. Since the "Feature" is there 20th Sept 2014, and fixing it would
possibly cause issues where someone rely on the current behaviour
- Fix the return order

What do you think we should do in this case?

Thanks,
Peter

[1]
https://github.com/apache/flink/blob/9e5f39e36a9b20a60573ff4051a9e8e8e54a78a9/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
[2] FLINK-1094: Reworked, improved, and testes split assigners:
https://github.com/apache/flink/commit/c32569aed12ffa968e2c2289c2d56db262c0eba4

Re: DefaultInputSplitAssigner question

Posted by Zhu Zhu <re...@gmail.com>.
Sure I will take a look.

Thanks,
Zhu

Péter Váry <pe...@gmail.com> 于2023年4月20日周四 22:58写道:
>
> Created a PR for the change: https://github.com/apache/flink/pull/22437
> Could you please review Zhu?
>
> Thanks,
> Peter
>
> Péter Váry <pe...@gmail.com> ezt írta (időpont: 2023. ápr. 20.,
> Cs, 13:54):
>
> > Thanks Zhu for the quick response!
> >
> > In the Iceberg Flink Source (old version, there is a FLIP-27 version as
> > well) the class is used [1], that is how I found it.
> >
> > Created a jira to update the doc [2].
> >
> > Peter
> >
> > [1]
> > https://github.com/apache/iceberg/blob/fe6f3b2325bce0acba4c75f4a9e9edec6021a3b0/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L105
> > [2] FLINK-31868: Fix DefaultInputSplitAssigner javadoc for class  -
> > https://issues.apache.org/jira/browse/FLINK-31868
> >
> >
> >
> > Zhu Zhu <re...@gmail.com> ezt írta (időpont: 2023. ápr. 20., Cs, 6:32):
> >
> >> Hi Peter,
> >>
> >> Maybe you can try Flink new source[1]? It allows you to customize your
> >> own `SplitEnumerator` which can control how to assign splits.
> >>
> >> DefaultInputSplitAssigner is not a public interface. And currently there
> >> is no requirement of the order of splits by Flink itself. Therefore, it
> >> looks to me more an outdated documentation problem. Furthermore,
> >> DefaultInputSplitAssigner is used for legacy sources which will be
> >> deprecated in the future.
> >>
> >> [1]
> >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/#the-data-source-api
> >>
> >> Thanks,
> >> Zhu
> >>
> >> Péter Váry <pe...@gmail.com> 于2023年4月19日周三 20:27写道:
> >> >
> >> > Hi Team,
> >> >
> >> > Recently I ran into the DefaultInputSplitAssigner [1].
> >> > The javadoc documentation states:
> >> >
> >> >
> >> > /**
> >> >
> >> >  * This is the default implementation of the {@link InputSplitAssigner}
> >> > interface. The default input
> >> >
> >> >  * split assigner simply returns all input splits of an input vertex *in
> >> > the order they were*
> >> >
> >> > * * originally computed*.
> >> >
> >> >  */
> >> >
> >> > The highlighted part says the order of the elements are kept and we
> >> wanted
> >> > to rely on this.
> >> >
> >> > OTOH the code does this:
> >> >
> >> >
> >> >         synchronized (this.splits) {
> >> >
> >> >             if (this.splits.size() > 0) {
> >> >
> >> >                 next = this.splits.remove(this.splits.size() - 1);
> >> >
> >> >             }
> >> >
> >> >         }
> >> >
> >> > Which is exactly opposite of the stated intent and returns the splits in
> >> > the reverse order :D
> >> > The change got committed almost a decade ago [2]
> >> >
> >> > I can see 2 solutions here:
> >> > - Change the comment by removing the restrictions for the ordering of
> >> the
> >> > splits. Since the "Feature" is there 20th Sept 2014, and fixing it would
> >> > possibly cause issues where someone rely on the current behaviour
> >> > - Fix the return order
> >> >
> >> > What do you think we should do in this case?
> >> >
> >> > Thanks,
> >> > Peter
> >> >
> >> > [1]
> >> >
> >> https://github.com/apache/flink/blob/9e5f39e36a9b20a60573ff4051a9e8e8e54a78a9/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
> >> > [2] FLINK-1094: Reworked, improved, and testes split assigners:
> >> >
> >> https://github.com/apache/flink/commit/c32569aed12ffa968e2c2289c2d56db262c0eba4
> >>
> >

Re: DefaultInputSplitAssigner question

Posted by Péter Váry <pe...@gmail.com>.
Created a PR for the change: https://github.com/apache/flink/pull/22437
Could you please review Zhu?

Thanks,
Peter

Péter Váry <pe...@gmail.com> ezt írta (időpont: 2023. ápr. 20.,
Cs, 13:54):

> Thanks Zhu for the quick response!
>
> In the Iceberg Flink Source (old version, there is a FLIP-27 version as
> well) the class is used [1], that is how I found it.
>
> Created a jira to update the doc [2].
>
> Peter
>
> [1]
> https://github.com/apache/iceberg/blob/fe6f3b2325bce0acba4c75f4a9e9edec6021a3b0/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L105
> [2] FLINK-31868: Fix DefaultInputSplitAssigner javadoc for class  -
> https://issues.apache.org/jira/browse/FLINK-31868
>
>
>
> Zhu Zhu <re...@gmail.com> ezt írta (időpont: 2023. ápr. 20., Cs, 6:32):
>
>> Hi Peter,
>>
>> Maybe you can try Flink new source[1]? It allows you to customize your
>> own `SplitEnumerator` which can control how to assign splits.
>>
>> DefaultInputSplitAssigner is not a public interface. And currently there
>> is no requirement of the order of splits by Flink itself. Therefore, it
>> looks to me more an outdated documentation problem. Furthermore,
>> DefaultInputSplitAssigner is used for legacy sources which will be
>> deprecated in the future.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/#the-data-source-api
>>
>> Thanks,
>> Zhu
>>
>> Péter Váry <pe...@gmail.com> 于2023年4月19日周三 20:27写道:
>> >
>> > Hi Team,
>> >
>> > Recently I ran into the DefaultInputSplitAssigner [1].
>> > The javadoc documentation states:
>> >
>> >
>> > /**
>> >
>> >  * This is the default implementation of the {@link InputSplitAssigner}
>> > interface. The default input
>> >
>> >  * split assigner simply returns all input splits of an input vertex *in
>> > the order they were*
>> >
>> > * * originally computed*.
>> >
>> >  */
>> >
>> > The highlighted part says the order of the elements are kept and we
>> wanted
>> > to rely on this.
>> >
>> > OTOH the code does this:
>> >
>> >
>> >         synchronized (this.splits) {
>> >
>> >             if (this.splits.size() > 0) {
>> >
>> >                 next = this.splits.remove(this.splits.size() - 1);
>> >
>> >             }
>> >
>> >         }
>> >
>> > Which is exactly opposite of the stated intent and returns the splits in
>> > the reverse order :D
>> > The change got committed almost a decade ago [2]
>> >
>> > I can see 2 solutions here:
>> > - Change the comment by removing the restrictions for the ordering of
>> the
>> > splits. Since the "Feature" is there 20th Sept 2014, and fixing it would
>> > possibly cause issues where someone rely on the current behaviour
>> > - Fix the return order
>> >
>> > What do you think we should do in this case?
>> >
>> > Thanks,
>> > Peter
>> >
>> > [1]
>> >
>> https://github.com/apache/flink/blob/9e5f39e36a9b20a60573ff4051a9e8e8e54a78a9/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
>> > [2] FLINK-1094: Reworked, improved, and testes split assigners:
>> >
>> https://github.com/apache/flink/commit/c32569aed12ffa968e2c2289c2d56db262c0eba4
>>
>

Re: DefaultInputSplitAssigner question

Posted by Péter Váry <pe...@gmail.com>.
Thanks Zhu for the quick response!

In the Iceberg Flink Source (old version, there is a FLIP-27 version as
well) the class is used [1], that is how I found it.

Created a jira to update the doc [2].

Peter

[1]
https://github.com/apache/iceberg/blob/fe6f3b2325bce0acba4c75f4a9e9edec6021a3b0/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L105
[2] FLINK-31868: Fix DefaultInputSplitAssigner javadoc for class  -
https://issues.apache.org/jira/browse/FLINK-31868



Zhu Zhu <re...@gmail.com> ezt írta (időpont: 2023. ápr. 20., Cs, 6:32):

> Hi Peter,
>
> Maybe you can try Flink new source[1]? It allows you to customize your
> own `SplitEnumerator` which can control how to assign splits.
>
> DefaultInputSplitAssigner is not a public interface. And currently there
> is no requirement of the order of splits by Flink itself. Therefore, it
> looks to me more an outdated documentation problem. Furthermore,
> DefaultInputSplitAssigner is used for legacy sources which will be
> deprecated in the future.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/#the-data-source-api
>
> Thanks,
> Zhu
>
> Péter Váry <pe...@gmail.com> 于2023年4月19日周三 20:27写道:
> >
> > Hi Team,
> >
> > Recently I ran into the DefaultInputSplitAssigner [1].
> > The javadoc documentation states:
> >
> >
> > /**
> >
> >  * This is the default implementation of the {@link InputSplitAssigner}
> > interface. The default input
> >
> >  * split assigner simply returns all input splits of an input vertex *in
> > the order they were*
> >
> > * * originally computed*.
> >
> >  */
> >
> > The highlighted part says the order of the elements are kept and we
> wanted
> > to rely on this.
> >
> > OTOH the code does this:
> >
> >
> >         synchronized (this.splits) {
> >
> >             if (this.splits.size() > 0) {
> >
> >                 next = this.splits.remove(this.splits.size() - 1);
> >
> >             }
> >
> >         }
> >
> > Which is exactly opposite of the stated intent and returns the splits in
> > the reverse order :D
> > The change got committed almost a decade ago [2]
> >
> > I can see 2 solutions here:
> > - Change the comment by removing the restrictions for the ordering of the
> > splits. Since the "Feature" is there 20th Sept 2014, and fixing it would
> > possibly cause issues where someone rely on the current behaviour
> > - Fix the return order
> >
> > What do you think we should do in this case?
> >
> > Thanks,
> > Peter
> >
> > [1]
> >
> https://github.com/apache/flink/blob/9e5f39e36a9b20a60573ff4051a9e8e8e54a78a9/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
> > [2] FLINK-1094: Reworked, improved, and testes split assigners:
> >
> https://github.com/apache/flink/commit/c32569aed12ffa968e2c2289c2d56db262c0eba4
>

Re: DefaultInputSplitAssigner question

Posted by Zhu Zhu <re...@gmail.com>.
Hi Peter,

Maybe you can try Flink new source[1]? It allows you to customize your
own `SplitEnumerator` which can control how to assign splits.

DefaultInputSplitAssigner is not a public interface. And currently there
is no requirement of the order of splits by Flink itself. Therefore, it
looks to me more an outdated documentation problem. Furthermore,
DefaultInputSplitAssigner is used for legacy sources which will be
deprecated in the future.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/sources/#the-data-source-api

Thanks,
Zhu

Péter Váry <pe...@gmail.com> 于2023年4月19日周三 20:27写道:
>
> Hi Team,
>
> Recently I ran into the DefaultInputSplitAssigner [1].
> The javadoc documentation states:
>
>
> /**
>
>  * This is the default implementation of the {@link InputSplitAssigner}
> interface. The default input
>
>  * split assigner simply returns all input splits of an input vertex *in
> the order they were*
>
> * * originally computed*.
>
>  */
>
> The highlighted part says the order of the elements are kept and we wanted
> to rely on this.
>
> OTOH the code does this:
>
>
>         synchronized (this.splits) {
>
>             if (this.splits.size() > 0) {
>
>                 next = this.splits.remove(this.splits.size() - 1);
>
>             }
>
>         }
>
> Which is exactly opposite of the stated intent and returns the splits in
> the reverse order :D
> The change got committed almost a decade ago [2]
>
> I can see 2 solutions here:
> - Change the comment by removing the restrictions for the ordering of the
> splits. Since the "Feature" is there 20th Sept 2014, and fixing it would
> possibly cause issues where someone rely on the current behaviour
> - Fix the return order
>
> What do you think we should do in this case?
>
> Thanks,
> Peter
>
> [1]
> https://github.com/apache/flink/blob/9e5f39e36a9b20a60573ff4051a9e8e8e54a78a9/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
> [2] FLINK-1094: Reworked, improved, and testes split assigners:
> https://github.com/apache/flink/commit/c32569aed12ffa968e2c2289c2d56db262c0eba4