You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Andy (Jira)" <ji...@apache.org> on 2021/03/21 13:04:00 UTC
[jira] [Created] (FLINK-21893) A ValidationException will be thrown
out when deriveRowType of Rank if partition key is an expression result of
Input Node.
Andy created FLINK-21893:
----------------------------
Summary: A ValidationException will be thrown out when deriveRowType of Rank if partition key is an expression result of Input Node.
Key: FLINK-21893
URL: https://issues.apache.org/jira/browse/FLINK-21893
Project: Flink
Issue Type: New Feature
Components: Table SQL / Planner
Reporter: Andy
A ValidationException will be thrown out when deriveRowType of Rank if partition key is an expression result of Input Node. e.g If run the following sql, A validationException will be thrown out.
{code:java}
//代码占位符
@Test
def test(): Unit = {
val data = List(
(2001L, 2L),
(2002L, 3L)
)
val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 'proctime.proctime)
tEnv.registerTable("T", ds)
val sql =
"""
|SELECT
| video_id,
| cnt,
| rownum_2
|FROM
|(
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| ORDER BY cnt DESC
| ) AS rownum_2
| FROM
| (
| SELECT
| video_id,
| cnt,
| ROW_NUMBER() OVER (
| PARTITION BY bucket_id
| ORDER BY cnt DESC
| ) AS rownum_1
| FROM
| (
| SELECT
| video_id,
| cnt,
| MOD(video_id, 64) as bucket_id
| FROM T
| )
| )
| WHERE rownum_1 <= 1000
|)
|WHERE rownum_2 <= 1000
|""".stripMargin
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
}
{code}
Exception detail
{code:java}
//代码占位符
org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2]
at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)