You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dylan Adams <dy...@gmail.com> on 2018/08/07 18:25:57 UTC

Filter-Join Ordering Issue

I'm trying to use the Flink DataSet API to validate some records and have
run into an issue. My program uses joins to validate inputs against
reference data. One of the attributes I'm validating is optional, and only
needs to be validated when non-NULL. So I added a filter to prevent the
null-keyed records from being used in the validation join, and was
surprised to receive this exception:

java.lang.RuntimeException: A NullPointerException occured while accessing
a key field in a POJO. Most likely, the value grouped/joined on is null.
Field name: optionalKey
at
org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)

It looks like the problem is that Flink has pushed the hash partitioning
aspect of the join before the filter for the null-keyed records and is
trying to hash the null keys. The issue can be seen in the plan
visualization:
https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png

I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
project: https://github.com/dkadams/flink-plan-issue/

Is this expected behavior or a bug? FLINK-1915 seems to have the same root
problem, but with a negative performance impact instead of a
RuntimeException.

Regards,
Dylan

Re: Filter-Join Ordering Issue

Posted by Fabian Hueske <fh...@gmail.com>.
I've created FLINK-10100 [1] to track the problem and suggest a solution
and workaround.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10100

2018-08-08 10:39 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Dylan,
>
> Yes, that's a bug.
> As you can see from the plan, the partitioning step is pushed past the
> Filter.
> This is possible, because the optimizer knows that a Filter function
> cannot modify the data (it only removes records).
>
> A workaround should be to implement the filter as a FlatMapFunction. A
> FlatMapFunction can arbitrarily modify a record (even if the return type
> stays the same).
> So the optimizer won't push the partitioning past a FlatMapFunction
> because it does not know whether the function modifies the partitioning key
> or not.
>
> Just FYI, you could annotate the FlatMapFunction and provide information
> about how it modifies the data [1] to enable certain optimizations but
> that's not what we want here.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/batch/#semantic-annotations
>
> 2018-08-08 10:23 GMT+02:00 Chesnay Schepler <ch...@apache.org>:
>
>> I agree, please open a JIRA.
>>
>>
>> On 08.08.2018 05:11, vino yang wrote:
>>
>> Hi Dylan,
>>
>> I roughly looked at your job program and the DAG of the job. It seems
>> that the optimizer chose the wrong optimization execution plan.
>>
>> cc Till.
>>
>> Thanks, vino.
>>
>> Dylan Adams <dy...@gmail.com> 于2018年8月8日周三 上午2:26写道:
>>
>>> I'm trying to use the Flink DataSet API to validate some records and
>>> have run into an issue. My program uses joins to validate inputs against
>>> reference data. One of the attributes I'm validating is optional, and only
>>> needs to be validated when non-NULL. So I added a filter to prevent the
>>> null-keyed records from being used in the validation join, and was
>>> surprised to receive this exception:
>>>
>>> java.lang.RuntimeException: A NullPointerException occured while
>>> accessing a key field in a POJO. Most likely, the value grouped/joined on
>>> is null. Field name: optionalKey
>>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator.
>>> hash(PojoComparator.java:199)
>>>
>>> It looks like the problem is that Flink has pushed the hash partitioning
>>> aspect of the join before the filter for the null-keyed records and is
>>> trying to hash the null keys. The issue can be seen in the plan
>>> visualization: https://raw.githubusercontent.com/dkadams/fli
>>> nk-plan-issue/master/plan-visualization.png
>>>
>>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
>>> project: https://github.com/dkadams/flink-plan-issue/
>>>
>>> Is this expected behavior or a bug? FLINK-1915 seems to have the same
>>> root problem, but with a negative performance impact instead of a
>>> RuntimeException.
>>>
>>> Regards,
>>> Dylan
>>>
>>
>>
>

Re: Filter-Join Ordering Issue

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dylan,

Yes, that's a bug.
As you can see from the plan, the partitioning step is pushed past the
Filter.
This is possible, because the optimizer knows that a Filter function cannot
modify the data (it only removes records).

A workaround should be to implement the filter as a FlatMapFunction. A
FlatMapFunction can arbitrarily modify a record (even if the return type
stays the same).
So the optimizer won't push the partitioning past a FlatMapFunction because
it does not know whether the function modifies the partitioning key or not.

Just FYI, you could annotate the FlatMapFunction and provide information
about how it modifies the data [1] to enable certain optimizations but
that's not what we want here.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations

2018-08-08 10:23 GMT+02:00 Chesnay Schepler <ch...@apache.org>:

> I agree, please open a JIRA.
>
>
> On 08.08.2018 05:11, vino yang wrote:
>
> Hi Dylan,
>
> I roughly looked at your job program and the DAG of the job. It seems that
> the optimizer chose the wrong optimization execution plan.
>
> cc Till.
>
> Thanks, vino.
>
> Dylan Adams <dy...@gmail.com> 于2018年8月8日周三 上午2:26写道:
>
>> I'm trying to use the Flink DataSet API to validate some records and have
>> run into an issue. My program uses joins to validate inputs against
>> reference data. One of the attributes I'm validating is optional, and only
>> needs to be validated when non-NULL. So I added a filter to prevent the
>> null-keyed records from being used in the validation join, and was
>> surprised to receive this exception:
>>
>> java.lang.RuntimeException: A NullPointerException occured while
>> accessing a key field in a POJO. Most likely, the value grouped/joined on
>> is null. Field name: optionalKey
>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(
>> PojoComparator.java:199)
>>
>> It looks like the problem is that Flink has pushed the hash partitioning
>> aspect of the join before the filter for the null-keyed records and is
>> trying to hash the null keys. The issue can be seen in the plan
>> visualization: https://raw.githubusercontent.com/dkadams/
>> flink-plan-issue/master/plan-visualization.png
>>
>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
>> project: https://github.com/dkadams/flink-plan-issue/
>>
>> Is this expected behavior or a bug? FLINK-1915 seems to have the same
>> root problem, but with a negative performance impact instead of a
>> RuntimeException.
>>
>> Regards,
>> Dylan
>>
>
>

Re: Filter-Join Ordering Issue

Posted by Chesnay Schepler <ch...@apache.org>.
I agree, please open a JIRA.

On 08.08.2018 05:11, vino yang wrote:
> Hi Dylan,
>
> I roughly looked at your job program and the DAG of the job. It seems 
> that the optimizer chose the wrong optimization execution plan.
>
> cc Till.
>
> Thanks, vino.
>
> Dylan Adams <dylan.adams@gmail.com <ma...@gmail.com>> 
> 于2018年8月8日周三 上午2:26写道:
>
>     I'm trying to use the Flink DataSet API to validate some records
>     and have run into an issue. My program uses joins to validate
>     inputs against reference data. One of the attributes I'm
>     validating is optional, and only needs to be validated when
>     non-NULL. So I added a filter to prevent the null-keyed records
>     from being used in the validation join, and was surprised to
>     receive this exception:
>
>     java.lang.RuntimeException: A NullPointerException occured while
>     accessing a key field in a POJO. Most likely, the value
>     grouped/joined on is null. Field name: optionalKey
>     at
>     org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)
>
>     It looks like the problem is that Flink has pushed the hash
>     partitioning aspect of the join before the filter for the
>     null-keyed records and is trying to hash the null keys. The issue
>     can be seen in the plan visualization:
>     https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png
>
>     I was able to reproduce the problem in v1.4.2 and 1.5.2, with this
>     small project: https://github.com/dkadams/flink-plan-issue/
>
>     Is this expected behavior or a bug? FLINK-1915 seems to have the
>     same root problem, but with a negative performance impact instead
>     of a RuntimeException.
>
>     Regards,
>     Dylan
>


Re: Filter-Join Ordering Issue

Posted by vino yang <ya...@gmail.com>.
Hi Dylan,

I roughly looked at your job program and the DAG of the job. It seems that
the optimizer chose the wrong optimization execution plan.

cc Till.

Thanks, vino.

Dylan Adams <dy...@gmail.com> 于2018年8月8日周三 上午2:26写道:

> I'm trying to use the Flink DataSet API to validate some records and have
> run into an issue. My program uses joins to validate inputs against
> reference data. One of the attributes I'm validating is optional, and only
> needs to be validated when non-NULL. So I added a filter to prevent the
> null-keyed records from being used in the validation join, and was
> surprised to receive this exception:
>
> java.lang.RuntimeException: A NullPointerException occured while accessing
> a key field in a POJO. Most likely, the value grouped/joined on is null.
> Field name: optionalKey
> at
> org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)
>
> It looks like the problem is that Flink has pushed the hash partitioning
> aspect of the join before the filter for the null-keyed records and is
> trying to hash the null keys. The issue can be seen in the plan
> visualization:
> https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png
>
> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
> project: https://github.com/dkadams/flink-plan-issue/
>
> Is this expected behavior or a bug? FLINK-1915 seems to have the same root
> problem, but with a negative performance impact instead of a
> RuntimeException.
>
> Regards,
> Dylan
>