You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yuzhao Chen (JIRA)" <ji...@apache.org> on 2019/02/17 04:22:00 UTC

[jira] [Comment Edited] (FLINK-11543) Type mismatch AssertionError in FilterJoinRule

    [ https://issues.apache.org/jira/browse/FLINK-11543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16770288#comment-16770288 ] 

Yuzhao Chen edited comment on FLINK-11543 at 2/17/19 4:21 AM:
--------------------------------------------------------------

[~twalthr]
I checked out the problem and find that in RelTimeIndicatorConverter#visit(temporalJoin: LogicalTemporalTableJoin) the join condition is not materialized, so the inputRefs in it are still time attributes type. 

So i added a code snippet in RelTimeIndicatorConverter just like normal join to make the join condition materialized:

{code:java}
def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
    val left = temporalJoin.getLeft.accept(this)
    val right = temporalJoin.getRight.accept(this)
    val materializer = createMaterializer(left, right)

    val rewrittenTemporalJoin = temporalJoin.copy(
      temporalJoin.getTraitSet,
      temporalJoin.getCondition.accept(materializer),
      left,
      right,
      temporalJoin.getJoinType,
      temporalJoin.isSemiJoinDone)

    val indicesToMaterialize = gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)

    materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize)
  }
{code}

then the optimize process passes, but anther validation failed in TemporalJoinConditionExtractor:
{code:java}
// Some comments here
Exception in thread "main" org.apache.flink.table.api.ValidationException: Non rowtime timeAttribute [TIMESTAMP(3)] used to create TemporalTableFunction
	at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:186)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
{code}


Uah.. this seems a semantic problem and not just a bug, we may need some code refactor to solve it, look forward to your suggestions.


was (Author: danny0405):
@Timo Walther
I checked out the problem and find that in RelTimeIndicatorConverter#visit(temporalJoin: LogicalTemporalTableJoin) the join condition is not materialized, so the inputRefs in it are still time attributes type. 

So i added a code snippet in RelTimeIndicatorConverter just like normal join to make the join condition materialized:

{code:java}
def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
    val left = temporalJoin.getLeft.accept(this)
    val right = temporalJoin.getRight.accept(this)
    val materializer = createMaterializer(left, right)

    val rewrittenTemporalJoin = temporalJoin.copy(
      temporalJoin.getTraitSet,
      temporalJoin.getCondition.accept(materializer),
      left,
      right,
      temporalJoin.getJoinType,
      temporalJoin.isSemiJoinDone)

    val indicesToMaterialize = gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)

    materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize)
  }
{code}

then the optimize process passes, but anther validation failed in TemporalJoinConditionExtractor:
{code:java}
// Some comments here
Exception in thread "main" org.apache.flink.table.api.ValidationException: Non rowtime timeAttribute [TIMESTAMP(3)] used to create TemporalTableFunction
	at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:186)
	at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
{code}


Uah.. this seems a semantic problem and not just a bug, we may need some code refactor to solve it, look forward to your suggestions.

> Type mismatch AssertionError in FilterJoinRule 
> -----------------------------------------------
>
>                 Key: FLINK-11543
>                 URL: https://issues.apache.org/jira/browse/FLINK-11543
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.7.1
>            Reporter: Timo Walther
>            Assignee: Yuzhao Chen
>            Priority: Major
>         Attachments: Test.java
>
>
> The following problem is copied from the user mailing list:
> {code}
> Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3)
>     at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
>     at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)
>     at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
>     at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
>     at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
>     at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
>     at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>     at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
>     at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
>     at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
>     at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
>     at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
>     at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
>     at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
>     at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
>     at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
>     at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
>     at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
>     at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>     at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
>     at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
>     at test.Test.main(Test.java:78) 
> {code}
> It sounds related to FLINK-10211. A runnable example is attached.
> See also: https://lists.apache.org/thread.html/9a9a979f4344111baf053a51ebfa2f2a0ba31e4d5a70e633dbcae254@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)