You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2021/09/10 05:25:00 UTC

[jira] [Created] (FLINK-24239) Event time temporal join should support values from array, map, row, etc. as join key

Caizhi Weng created FLINK-24239:
-----------------------------------

             Summary: Event time temporal join should support values from array, map, row, etc. as join key
                 Key: FLINK-24239
                 URL: https://issues.apache.org/jira/browse/FLINK-24239
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
    Affects Versions: 1.12.6, 1.13.3, 1.15.0, 1.14.1
            Reporter: Caizhi Weng


This ticket is from the [mailing list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].

Currently in event time temporal join when join keys are from an array, map or row, an exception will be thrown saying "Currently the join key in Temporal Table Join can not be empty". This is quite confusing for users as they've already set the join keys.

Add the following test case to {{TableEnvironmentITCase}} to reproduce this issue.
{code:scala}
@Test
def myTest(): Unit = {
  tEnv.executeSql(
    """
      |CREATE TABLE A (
      |  a MAP<STRING NOT NULL, INT>,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql(
    """
      |CREATE TABLE B (
      |  id INT,
      |  ts TIMESTAMP(3),
      |  WATERMARK FOR ts AS ts
      |) WITH (
      |  'connector' = 'values'
      |)
      |""".stripMargin)
  tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b ON A.a['ID'] = id").print()
}
{code}
The exception stack is
{code:java}
org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.

	at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:77)
	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1704)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:807)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1289)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:744)
	at org.apache.flink.table.api.TableEnvironmentITCase.myTest(TableEnvironmentITCase.scala:109)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
{code}
However if we change the event time temporal join to a lookup join it will be good. This is because in the temporal_join_rewrite optimization phase, lookup joins and event time temporal joins will go through different rules.
 * Lookup joins will go through LogicalCorrelateToJoinFromLookupTableRuleWithFilter (or without filter) which directly changes the correlate node to a join node.
 * Event time temporal joins, however, will go through LogicalCorrelateToJoinFromTemporalTableRuleWithFilter (or without filter). In this rule their join keys will be checked and extracted fromĀ {{joinInfo}}. The join keys of {{JoinInfo}} are limited to pure input references (see {{RelOptUtil#splitJoinCondition}}) thus causing this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)