You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Karl Jin <ka...@gmail.com> on 2019/02/25 21:35:33 UTC

left join failing with FlinkLogicalJoinConverter NPE

Hello,

First time posting, so please let me know if the formatting isn't correct,
etc.

I'm trying to left join two Kafka sources, running 1.7.2 locally, but
getting the below exception. Looks like some sort of query optimization
process but I'm not sure where to start investigating/debugging. I see
things are marked as NONE in the object so that's a bit of a flag to me,
although I don't know for sure. Any pointer would be much appreciated:

Exception in thread "main" java.lang.RuntimeException: Error while applying
rule FlinkLogicalJoinConverter, args
[rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
$6),joinType=left)]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
at
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
at
org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
at
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
    ...
Caused by: java.lang.RuntimeException: Error occurred while applying rule
FlinkLogicalJoinConverter
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
... 11 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
at
org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
at
org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
at
org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
at
org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
at
org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
at
org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
at
org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
at
org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
at
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
at
GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
Source)
at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
Source)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
at
org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Tony Wei <to...@gmail.com>.
Hi,

I also found the similar issue here [1].

Best,
Tony Wei
[1] https://issues.apache.org/jira/browse/FLINK-11433

Tony Wei <to...@gmail.com> 於 2019年7月19日 週五 下午5:38寫道:

> Hi,
>
> Is there any update for this issue? I have had the same problem just like
> Karl's.
> After I remove query like "select collect(data) ..." from one of the
> joined tables,
> the sql can be executed correctly without throwing any NPE.
>
> Best regards,
> Tony Wei
>
> Xingcan Cui <xi...@gmail.com> 於 2019年2月27日 週三 下午12:53寫道:
>
>> Hi Karl,
>>
>> I think this is a bug and created FLINK-11769
>> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>>
>> Best,
>> Xingcan
>>
>> On Feb 26, 2019, at 2:02 PM, Karl Jin <ka...@gmail.com> wrote:
>>
>> I removed the multiset<map<string,string>> field and the join worked
>> fine. The field was created from a Kafka source through a query that looks
>> like "select collect(data) as i_data from ... group by pk"
>>
>> Do you think this is a bug or is this something I can get around using
>> some configuration?
>>
>> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Yes. Please check that. If it's the nested type's problem, this might be
>>> a bug.
>>>
>>> On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:
>>>
>>>> Do you think something funky might be happening with Map/Multiset
>>>> types? If so how do I deal with it (I think I can verify by removing those
>>>> columns and retry?)?
>>>>
>>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>>>>
>>>>> Thanks for checking in quickly,
>>>>>
>>>>> Below is what I got on printSchema on the two tables (left joining the
>>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>>> extracted from the string field uc_update_ts
>>>>>
>>>>> root
>>>>>  |-- uc_pk: String
>>>>>  |-- uc_update_ts: String
>>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- uc_version: String
>>>>>  |-- uc_type: String
>>>>>  |-- data_parsed: Map<String, String>
>>>>>
>>>>> root
>>>>>  |-- i_uc_pk: String
>>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>>  |-- image_count: Long
>>>>>  |-- i_data: Multiset<Map<String, String>>
>>>>>
>>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Karl,
>>>>>>
>>>>>> It seems that some field types of your inputs were not properly
>>>>>> extracted.
>>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hello,
>>>>>> >
>>>>>> > First time posting, so please let me know if the formatting isn't
>>>>>> correct, etc.
>>>>>> >
>>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>>> but getting the below exception. Looks like some sort of query optimization
>>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>>> >
>>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>>> $6),joinType=left)]
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>>> >       at
>>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>>>>> >       at
>>>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>>>>> >     ...
>>>>>> > Caused by: java.lang.RuntimeException: Error occurred while
>>>>>> applying rule FlinkLogicalJoinConverter
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>>>>> >       ... 11 more
>>>>>> > Caused by: java.lang.NullPointerException
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>>>>> >       at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>>>>> >       at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>>>>> >       at
>>>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>>>>> >       at
>>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>>>>> >       at
>>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>>>>> Source)
>>>>>> >       at
>>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>>>>> Source)
>>>>>> >       at
>>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>>>>> >       at
>>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>>>>
>>>>>>
>>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Tony Wei <to...@gmail.com>.
Hi,

Is there any update for this issue? I have had the same problem just like
Karl's.
After I remove query like "select collect(data) ..." from one of the joined
tables,
the sql can be executed correctly without throwing any NPE.

Best regards,
Tony Wei

Xingcan Cui <xi...@gmail.com> 於 2019年2月27日 週三 下午12:53寫道:

> Hi Karl,
>
> I think this is a bug and created FLINK-11769
> <https://issues.apache.org/jira/browse/FLINK-11769> to track it.
>
> Best,
> Xingcan
>
> On Feb 26, 2019, at 2:02 PM, Karl Jin <ka...@gmail.com> wrote:
>
> I removed the multiset<map<string,string>> field and the join worked fine.
> The field was created from a Kafka source through a query that looks like
> "select collect(data) as i_data from ... group by pk"
>
> Do you think this is a bug or is this something I can get around using
> some configuration?
>
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xi...@gmail.com> wrote:
>
>> Yes. Please check that. If it's the nested type's problem, this might be
>> a bug.
>>
>> On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:
>>
>>> Do you think something funky might be happening with Map/Multiset types?
>>> If so how do I deal with it (I think I can verify by removing those columns
>>> and retry?)?
>>>
>>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>>>
>>>> Thanks for checking in quickly,
>>>>
>>>> Below is what I got on printSchema on the two tables (left joining the
>>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>>> extracted from the string field uc_update_ts
>>>>
>>>> root
>>>>  |-- uc_pk: String
>>>>  |-- uc_update_ts: String
>>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- uc_version: String
>>>>  |-- uc_type: String
>>>>  |-- data_parsed: Map<String, String>
>>>>
>>>> root
>>>>  |-- i_uc_pk: String
>>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>>  |-- image_count: Long
>>>>  |-- i_data: Multiset<Map<String, String>>
>>>>
>>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:
>>>>
>>>>> Hi Karl,
>>>>>
>>>>> It seems that some field types of your inputs were not properly
>>>>> extracted.
>>>>> Could you share the result of `printSchema()` for your input tables?
>>>>>
>>>>> Best,
>>>>> Xingcan
>>>>>
>>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>>>> >
>>>>> > Hello,
>>>>> >
>>>>> > First time posting, so please let me know if the formatting isn't
>>>>> correct, etc.
>>>>> >
>>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally,
>>>>> but getting the below exception. Looks like some sort of query optimization
>>>>> process but I'm not sure where to start investigating/debugging. I see
>>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>>> >
>>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>>> applying rule FlinkLogicalJoinConverter, args
>>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>>> $6),joinType=left)]
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>>> >       at
>>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>>> >       at
>>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>>> >       at
>>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>>> >       at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>>> >       at
>>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>>>> >       at
>>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>>>> >     ...
>>>>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>>>>> rule FlinkLogicalJoinConverter
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>>>> >       at
>>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>>>> >       at
>>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>>>> >       ... 11 more
>>>>> > Caused by: java.lang.NullPointerException
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>>>> >       at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>>>> >       at
>>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>>>> >       at
>>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>>>> >       at
>>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>>>> >       at
>>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>>>> >       at
>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>>>> Source)
>>>>> >       at
>>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>>>> Source)
>>>>> >       at
>>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>>>> >       at
>>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>>>
>>>>>
>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Karl,

I think this is a bug and created FLINK-11769 <https://issues.apache.org/jira/browse/FLINK-11769> to track it.

Best,
Xingcan

> On Feb 26, 2019, at 2:02 PM, Karl Jin <ka...@gmail.com> wrote:
> 
> I removed the multiset<map<string,string>> field and the join worked fine. The field was created from a Kafka source through a query that looks like "select collect(data) as i_data from ... group by pk"
> 
> Do you think this is a bug or is this something I can get around using some configuration?
> 
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Yes. Please check that. If it's the nested type's problem, this might be a bug.
> 
> On Mon, Feb 25, 2019, 21:50 Karl Jin <karl.jin@gmail.com <ma...@gmail.com>> wrote:
> Do you think something funky might be happening with Map/Multiset types? If so how do I deal with it (I think I can verify by removing those columns and retry?)?
> 
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <karl.jin@gmail.com <ma...@gmail.com>> wrote:
> Thanks for checking in quickly,
> 
> Below is what I got on printSchema on the two tables (left joining the second one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from the string field uc_update_ts
> 
> root
>  |-- uc_pk: String
>  |-- uc_update_ts: String
>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>  |-- uc_version: String
>  |-- uc_type: String
>  |-- data_parsed: Map<String, String>
> 
> root
>  |-- i_uc_pk: String
>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>  |-- image_count: Long
>  |-- i_data: Multiset<Map<String, String>>
> 
> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
> Hi Karl,
> 
> It seems that some field types of your inputs were not properly extracted. 
> Could you share the result of `printSchema()` for your input tables?
> 
> Best,
> Xingcan
> 
> > On Feb 25, 2019, at 4:35 PM, Karl Jin <karl.jin@gmail.com <ma...@gmail.com>> wrote:
> > 
> > Hello,
> > 
> > First time posting, so please let me know if the formatting isn't correct, etc.
> > 
> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
> > 
> > Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
> >       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> >       at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> >       at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> >       at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> >       at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> >       at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> >       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> >       at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> >       at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> >     ...
> > Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
> >       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> >       at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> >       at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> >       at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> >       ... 11 more
> > Caused by: java.lang.NullPointerException
> >       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> >       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> >       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> >       at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
> >       at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> >       at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> >       at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> >       at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
> >       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
> >       at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
> >       at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> >       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
> >       at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
> >       at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
> >       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
> >       at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
> >       at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
> >       at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
> 


Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Karl Jin <ka...@gmail.com>.
I removed the multiset<map<string,string>> field and the join worked fine.
The field was created from a Kafka source through a query that looks like
"select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some
configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui <xi...@gmail.com> wrote:

> Yes. Please check that. If it's the nested type's problem, this might be a
> bug.
>
> On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:
>
>> Do you think something funky might be happening with Map/Multiset types?
>> If so how do I deal with it (I think I can verify by removing those columns
>> and retry?)?
>>
>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>>
>>> Thanks for checking in quickly,
>>>
>>> Below is what I got on printSchema on the two tables (left joining the
>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>> extracted from the string field uc_update_ts
>>>
>>> root
>>>  |-- uc_pk: String
>>>  |-- uc_update_ts: String
>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- uc_version: String
>>>  |-- uc_type: String
>>>  |-- data_parsed: Map<String, String>
>>>
>>> root
>>>  |-- i_uc_pk: String
>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>  |-- image_count: Long
>>>  |-- i_data: Multiset<Map<String, String>>
>>>
>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:
>>>
>>>> Hi Karl,
>>>>
>>>> It seems that some field types of your inputs were not properly
>>>> extracted.
>>>> Could you share the result of `printSchema()` for your input tables?
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>>> >
>>>> > Hello,
>>>> >
>>>> > First time posting, so please let me know if the formatting isn't
>>>> correct, etc.
>>>> >
>>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
>>>> getting the below exception. Looks like some sort of query optimization
>>>> process but I'm not sure where to start investigating/debugging. I see
>>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>>> although I don't know for sure. Any pointer would be much appreciated:
>>>> >
>>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>>> applying rule FlinkLogicalJoinConverter, args
>>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>>> $6),joinType=left)]
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>>> >       at
>>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>>> >       at
>>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>>> >       at
>>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>>> >       at
>>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>>> >       at
>>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>>> >       at
>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>>> >       at
>>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>>> >       at
>>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>>> >     ...
>>>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>>>> rule FlinkLogicalJoinConverter
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>>> >       at
>>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>>> >       at
>>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>>> >       ... 11 more
>>>> > Caused by: java.lang.NullPointerException
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>>> >       at
>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>>> >       at
>>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>>> >       at
>>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>>> >       at
>>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>>> >       at
>>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>>> >       at
>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>>> Source)
>>>> >       at
>>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>>> Source)
>>>> >       at
>>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>>> >       at
>>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>>
>>>>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Xingcan Cui <xi...@gmail.com>.
Yes. Please check that. If it's the nested type's problem, this might be a
bug.

On Mon, Feb 25, 2019, 21:50 Karl Jin <ka...@gmail.com> wrote:

> Do you think something funky might be happening with Map/Multiset types?
> If so how do I deal with it (I think I can verify by removing those columns
> and retry?)?
>
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:
>
>> Thanks for checking in quickly,
>>
>> Below is what I got on printSchema on the two tables (left joining the
>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>> extracted from the string field uc_update_ts
>>
>> root
>>  |-- uc_pk: String
>>  |-- uc_update_ts: String
>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>  |-- uc_version: String
>>  |-- uc_type: String
>>  |-- data_parsed: Map<String, String>
>>
>> root
>>  |-- i_uc_pk: String
>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>  |-- image_count: Long
>>  |-- i_data: Multiset<Map<String, String>>
>>
>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:
>>
>>> Hi Karl,
>>>
>>> It seems that some field types of your inputs were not properly
>>> extracted.
>>> Could you share the result of `printSchema()` for your input tables?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>>> >
>>> > Hello,
>>> >
>>> > First time posting, so please let me know if the formatting isn't
>>> correct, etc.
>>> >
>>> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
>>> getting the below exception. Looks like some sort of query optimization
>>> process but I'm not sure where to start investigating/debugging. I see
>>> things are marked as NONE in the object so that's a bit of a flag to me,
>>> although I don't know for sure. Any pointer would be much appreciated:
>>> >
>>> > Exception in thread "main" java.lang.RuntimeException: Error while
>>> applying rule FlinkLogicalJoinConverter, args
>>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>>> $6),joinType=left)]
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>>> >       at
>>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>>> >       at
>>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>>> >       at
>>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>>> >       at
>>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>>> >       at
>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>>> >       at
>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>>> >       at
>>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>>> >       at
>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>>> >     ...
>>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>>> rule FlinkLogicalJoinConverter
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>>> >       at
>>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>>> >       at
>>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>>> >       ... 11 more
>>> > Caused by: java.lang.NullPointerException
>>> >       at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>>> >       at
>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>> >       at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>>> >       at
>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>>> >       at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>>> >       at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>>> >       at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>>> >       at
>>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>>> >       at
>>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>>> >       at
>>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>>> >       at
>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>>> >       at
>>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>>> >       at
>>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>>> >       at
>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>>> Source)
>>> >       at
>>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>>> Source)
>>> >       at
>>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>>> >       at
>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>>> >       at
>>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>>> >       at
>>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>>
>>>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Karl Jin <ka...@gmail.com>.
Do you think something funky might be happening with Map/Multiset types? If
so how do I deal with it (I think I can verify by removing those columns
and retry?)?

On Mon, Feb 25, 2019 at 6:28 PM Karl Jin <ka...@gmail.com> wrote:

> Thanks for checking in quickly,
>
> Below is what I got on printSchema on the two tables (left joining the
> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
> extracted from the string field uc_update_ts
>
> root
>  |-- uc_pk: String
>  |-- uc_update_ts: String
>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>  |-- uc_version: String
>  |-- uc_type: String
>  |-- data_parsed: Map<String, String>
>
> root
>  |-- i_uc_pk: String
>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>  |-- image_count: Long
>  |-- i_data: Multiset<Map<String, String>>
>
> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Karl,
>>
>> It seems that some field types of your inputs were not properly
>> extracted.
>> Could you share the result of `printSchema()` for your input tables?
>>
>> Best,
>> Xingcan
>>
>> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > First time posting, so please let me know if the formatting isn't
>> correct, etc.
>> >
>> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
>> getting the below exception. Looks like some sort of query optimization
>> process but I'm not sure where to start investigating/debugging. I see
>> things are marked as NONE in the object so that's a bit of a flag to me,
>> although I don't know for sure. Any pointer would be much appreciated:
>> >
>> > Exception in thread "main" java.lang.RuntimeException: Error while
>> applying rule FlinkLogicalJoinConverter, args
>> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
>> $6),joinType=left)]
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>> >       at
>> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>> >       at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
>> >       at
>> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>> >       at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>> >       at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>> >       at
>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
>> >       at
>> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>> >       at
>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>> >     ...
>> > Caused by: java.lang.RuntimeException: Error occurred while applying
>> rule FlinkLogicalJoinConverter
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
>> >       at
>> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
>> >       at
>> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>> >       ... 11 more
>> > Caused by: java.lang.NullPointerException
>> >       at
>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
>> >       at
>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>> >       at
>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
>> >       at
>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
>> >       at
>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
>> >       at
>> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
>> >       at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>> >       at
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>> >       at
>> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
>> >       at
>> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
>> >       at
>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
>> >       at
>> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
>> >       at
>> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
>> >       at
>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
>> Source)
>> >       at
>> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
>> Source)
>> >       at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
>> >       at
>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
>> >       at
>> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
>> >       at
>> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>>
>>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Karl Jin <ka...@gmail.com>.
Thanks for checking in quickly,

Below is what I got on printSchema on the two tables (left joining the
second one to the first one on uc_pk = i_uc_pk). rowtime in both are
extracted from the string field uc_update_ts

root
 |-- uc_pk: String
 |-- uc_update_ts: String
 |-- rowtime: TimeIndicatorTypeInfo(rowtime)
 |-- uc_version: String
 |-- uc_type: String
 |-- data_parsed: Map<String, String>

root
 |-- i_uc_pk: String
 |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
 |-- image_count: Long
 |-- i_data: Multiset<Map<String, String>>

On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui <xi...@gmail.com> wrote:

> Hi Karl,
>
> It seems that some field types of your inputs were not properly extracted.
> Could you share the result of `printSchema()` for your input tables?
>
> Best,
> Xingcan
>
> > On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
> >
> > Hello,
> >
> > First time posting, so please let me know if the formatting isn't
> correct, etc.
> >
> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
> getting the below exception. Looks like some sort of query optimization
> process but I'm not sure where to start investigating/debugging. I see
> things are marked as NONE in the object so that's a bit of a flag to me,
> although I don't know for sure. Any pointer would be much appreciated:
> >
> > Exception in thread "main" java.lang.RuntimeException: Error while
> applying rule FlinkLogicalJoinConverter, args
> [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
> $6),joinType=left)]
> >       at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> >       at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> >       at
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> >       at
> org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> >       at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> >       at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> >       at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> >       at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> >       at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> >     ...
> > Caused by: java.lang.RuntimeException: Error occurred while applying
> rule FlinkLogicalJoinConverter
> >       at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> >       at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> >       at
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> >       at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> >       ... 11 more
> > Caused by: java.lang.NullPointerException
> >       at
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> >       at
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> >       at
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >       at
> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> >       at
> org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
> >       at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> >       at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> >       at
> scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> >       at
> org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
> >       at
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
> >       at
> org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
> >       at
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> >       at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> Source)
> >       at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> Source)
> >       at
> org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
> >       at
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
> >       at
> org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
> >       at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
> >       at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)
>
>

Re: left join failing with FlinkLogicalJoinConverter NPE

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Karl,

It seems that some field types of your inputs were not properly extracted. 
Could you share the result of `printSchema()` for your input tables?

Best,
Xingcan

> On Feb 25, 2019, at 4:35 PM, Karl Jin <ka...@gmail.com> wrote:
> 
> Hello,
> 
> First time posting, so please let me know if the formatting isn't correct, etc.
> 
> I'm trying to left join two Kafka sources, running 1.7.2 locally, but getting the below exception. Looks like some sort of query optimization process but I'm not sure where to start investigating/debugging. I see things are marked as NONE in the object so that's a bit of a flag to me, although I don't know for sure. Any pointer would be much appreciated:
> 
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule FlinkLogicalJoinConverter, args [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0, $6),joinType=left)]
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> 	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> 	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> 	at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> 	at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> 	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> 	at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> 	at org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> 	at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
>     ...
> Caused by: java.lang.RuntimeException: Error occurred while applying rule FlinkLogicalJoinConverter
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> 	at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> 	at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> 	... 11 more
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> 	at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> 	at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> 	at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> 	at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> 	at org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
> 	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> 	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> 	at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:48)
> 	at org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateRowSize(FlinkRelNode.scala:79)
> 	at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateRowSize(FlinkLogicalJoinBase.scala:29)
> 	at org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.computeSelfCost(FlinkLogicalJoinBase.scala:51)
> 	at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:162)
> 	at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source)
> 	at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source)
> 	at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:301)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.getCost(VolcanoPlanner.java:953)
> 	at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements0(RelSubset.java:339)
> 	at org.apache.calcite.plan.volcano.RelSubset.propagateCostImprovements(RelSubset.java:322)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.addRelToSet(VolcanoPlanner.java:1643)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1579)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:859)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:879)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:1755)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:135)