You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2019/07/19 09:38:16 UTC

Re: left join failing with FlinkLogicalJoinConverter NPE

Hi,

Is there any update for this issue? I have had the same problem just like
Karl's.
After I remove query like "select collect(data) ..." from one of the joined
tables,
the sql can be executed correctly without throwing any NPE.

Best regards,
Tony Wei

Xingcan Cui <xi...@gmail.com> 於 2019年2月27日 週三 下午12:53寫道:

> Hi Karl,
>
> I think this is a bug and created FLINK-11769
> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>
> Best,
> Xingcan
>
> On Feb 26, 2019, at 2:02 PM, Karl Jin <ka...@gmail.com> wrote:
>
> I removed the multiset<map<string,string>> field and the join worked fine.
> The field was created from a Kafka source through a query that looks like
> "select collect(data) as i_data from ... group by pk"
>
> Do you think this is a bug or is this something I can get around using
> some configuration?
>
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xi...@gmail.com> wrote:
>
>> Yes. Please check that. If it's the nested type's problem, this might be
>> a bug.
>>
>> On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:
>>
>>> Do you think something funky might be happening with Map/Multiset types?
>>> If so how do I deal with it (I think I can verify by removing those columns
>>> and retry?)?
>>>
>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>>>
>>>> Thanks for checking in quickly,
>>>>
>>>> Below is what I got on printSchema on the two tables (left joining the
>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>> extracted from the string field uc_update_ts
>>>>
>>>> root
>>>>  |-- uc_pk: String
>>>>  |-- uc_update_ts: String
>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- uc_version: String
>>>>  |-- uc_type: String
>>>>  |-- data_parsed: Map<String, String>
>>>>
>>>> root
>>>>  |-- i_uc_pk: String
>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- image_count: Long
>>>>  |-- i_data: Multiset<Map<String, String>>
>>>>
>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:
>>>>
>>>>> Hi Karl,
>>>>>
>>>>> It seems that some field types of your inputs were not properly
>>>>> extracted.
>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>
>>>>> Best,
>>>>> Xingcan
>>>>>
>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>>>> >
>>>>> > Hello,
>>>>> >
>>>>> > First time posting, so please let me know if the formatting isn't
>>>>> correct, etc.
>>>>> >
>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>> but getting the below exception. Looks like some sort of query optimization
>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>> >
>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>> $6),joinType=left)]
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>> >       at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>> >       at
>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>>> >       at
>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>>> >       at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>>> >       at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>>>> >     ...
>>>>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>>>>> rule FlinkLogicalJoinConverter
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>>>> >       at
>>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>>>> >       at
>>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>>>> >       ... 11 more
>>>>> > Caused by: java.lang.NullPointerException
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>>>> >       at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>>>> >       at
>>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>>>> >       at
>>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>>>> >       at
>>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>>>> >       at
>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>>>> Source)
>>>>> >       at
>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>>>> Source)
>>>>> >       at
>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>>>
>>>>>
>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Tony Wei <to...@gmail.com>.
Hi,

I also found the similar issue here [1].

Best,
Tony Wei
[1] https://issues.apache.org/jira/browse/FLINK-11433

Tony Wei <to...@gmail.com> 於 2019年7月19日 週五 下午5:38寫道:

> Hi,
>
> Is there any update for this issue? I have had the same problem just like
> Karl's.
> After I remove query like "select collect(data) ..." from one of the
> joined tables,
> the sql can be executed correctly without throwing any NPE.
>
> Best regards,
> Tony Wei
>
> Xingcan Cui <xi...@gmail.com> 於 2019年2月27日 週三 下午12:53寫道:
>
>> Hi Karl,
>>
>> I think this is a bug and created FLINK-11769
>> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>>
>> Best,
>> Xingcan
>>
>> On Feb 26, 2019, at 2:02 PM, Karl Jin <ka...@gmail.com> wrote:
>>
>> I removed the multiset<map<string,string>> field and the join worked
>> fine. The field was created from a Kafka source through a query that looks
>> like "select collect(data) as i_data from ... group by pk"
>>
>> Do you think this is a bug or is this something I can get around using
>> some configuration?
>>
>> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Yes. Please check that. If it's the nested type's problem, this might be
>>> a bug.
>>>
>>> On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:
>>>
>>>> Do you think something funky might be happening with Map/Multiset
>>>> types? If so how do I deal with it (I think I can verify by removing those
>>>> columns and retry?)?
>>>>
>>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>>>>
>>>>> Thanks for checking in quickly,
>>>>>
>>>>> Below is what I got on printSchema on the two tables (left joining the
>>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>>> extracted from the string field uc_update_ts
>>>>>
>>>>> root
>>>>>  |-- uc_pk: String
>>>>>  |-- uc_update_ts: String
>>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- uc_version: String
>>>>>  |-- uc_type: String
>>>>>  |-- data_parsed: Map<String, String>
>>>>>
>>>>> root
>>>>>  |-- i_uc_pk: String
>>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- image_count: Long
>>>>>  |-- i_data: Multiset<Map<String, String>>
>>>>>
>>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karl,
>>>>>>
>>>>>> It seems that some field types of your inputs were not properly
>>>>>> extracted.
>>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hello,
>>>>>> >
>>>>>> > First time posting, so please let me know if the formatting isn't
>>>>>> correct, etc.
>>>>>> >
>>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>>> but getting the below exception. Looks like some sort of query optimization
>>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>>> >
>>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>>> $6),joinType=left)]
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>>> >       at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>>>>> >     ...
>>>>>> > Caused by: java.lang.RuntimeException: Error occurred while
>>>>>> applying rule FlinkLogicalJoinConverter
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>>>>> >       ... 11 more
>>>>>> > Caused by: java.lang.NullPointerException
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>>>>> >       at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>>>>> >       at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>>>>> >       at
>>>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>>>>> >       at
>>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>>>>> Source)
>>>>>> >       at
>>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>>>>> Source)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>>>>
>>>>>>
>>