You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Shengkai Fang (Jira)" <ji...@apache.org> on 2021/08/04 13:14:00 UTC

[jira] [Created] (FLINK-23623) Over expression should allow to order by any fields in batch mode

Shengkai Fang created FLINK-23623:
-------------------------------------

             Summary: Over expression should allow to order by any fields in batch mode
                 Key: FLINK-23623
                 URL: https://issues.apache.org/jira/browse/FLINK-23623
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / API
    Affects Versions: 1.13.1
            Reporter: Shengkai Fang


Please add test in the {{OverWindowStringExpressionTest}}
{code:java}
 @Test
  def testRange(): Unit = {
    val util = batchTestUtil()
//    val t = util.addDataStream[(Long, Int, String, Int, Long)](
//      "T1",'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)

    util.tableEnv.executeSql(
      """
        |CREATE TABLE T1 (
        |  a int,
        |  b int,
        |  c int,
        |  d int,
        |  e int,
        |  rowtime TIMESTAMP(3),
        |  watermark for rowtime as rowtime
        |) WITH (
        |  'connector' = 'values'
        |)
        |""".stripMargin
    )

    util.addTemporarySystemFunction("weightAvgFun", classOf[WeightedAvg])
    val t = util.tableEnv.from("T1")
    val resScala = t
      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
      .select('a, 'b.sum over 'w, call("weightAvgFun", 'a, 'b) over 'w as 'myCnt)
    
  }
{code}
The exception stack as follows
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute.
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
at org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
{code}
The main reason why this fails is because {{OverCall}} will check whether the order by key is rowtime during the {{validateInput}}. However, it doesn't have rowtime in batch mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)