You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2021/03/11 09:43:00 UTC

[jira] [Created] (FLINK-21733) WatermarkAssigner incorrectly recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException

lincoln lee created FLINK-21733:
-----------------------------------

             Summary: WatermarkAssigner incorrectly recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException
                 Key: FLINK-21733
                 URL: https://issues.apache.org/jira/browse/FLINK-21733
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.2, 1.11.3
            Reporter: lincoln lee
             Fix For: 1.13.0


WatermarkAssigner incorrectly recomputing the rowtime index in copy method which may cause ArrayIndexOutOfBoundsException in such case:
{code}
@Test
  def testProjectTransposeWatermarkAssigner(): Unit = {
    val sourceDDL =
      s"""
         |CREATE TEMPORARY TABLE `t1` (
         |  `a`  VARCHAR,
         |  `b`  VARCHAR,
         |  `c`  VARCHAR,
         |  `d`  INT,
         |  `t`  TIMESTAMP(3),
         |  `ts` AS `t`,
         |  WATERMARK FOR `ts` AS `ts`  - INTERVAL '10' SECOND
         |) WITH (
         |  'connector' = 'values',
         |  'enable-watermark-push-down' = 'true',
         |  'bounded' = 'false',
         |  'disable-lookup' = 'true'
         |)
       """.stripMargin
    util.tableEnv.executeSql(sourceDDL)

    val sql =
      s"""
         |select a, b, ts
         |from t1
         |""".stripMargin
    util.verifyPlan(sql)
  }
{code}

exception stack
{code}
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3

	at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:75)
	at org.apache.calcite.util.Util$TransformingList.get(Util.java:2732)
	at org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner.copy(WatermarkAssigner.scala:68)
	at org.apache.calcite.plan.hep.HepPlanner.addRelToGraph(HepPlanner.java:805)
	at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:158)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:282)

{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)