You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "chuncheng wu (Jira)" <ji...@apache.org> on 2022/02/09 10:26:00 UTC

[jira] [Created] (FLINK-26051) row_number =1 and Subsequent SQL has "case when" and "where" statement : The window can only be ordered in ASCENDING mode

chuncheng wu created FLINK-26051:
------------------------------------

             Summary: row_number =1 and Subsequent SQL has "case when" and "where" statement : The window can only be ordered in ASCENDING mode
                 Key: FLINK-26051
                 URL: https://issues.apache.org/jira/browse/FLINK-26051
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.2
            Reporter: chuncheng wu


hello,

   i have 2 sqls. One sql has rn=1 and  the Subsequent SQL has "case when" and "where".it results the exception as follow. It happen in the occasion when logical plan turn into physical plan :
{quote}_org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode._

    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)_
    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)_
    _at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)_
    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)_
    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)_
    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)_
    _at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)_
    _at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)_
    _at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)_
    _at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)_
    _at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)_
    _at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)_
    _at scala.collection.Iterator$class.foreach(Iterator.scala:891)_
    _at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)_
    _at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)_
    _at scala.collection.AbstractIterable.foreach(Iterable.scala:54)_
    _at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)_
    _at scala.collection.AbstractTraversable.map(Traversable.scala:104)_
    _at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)_
    _at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)_
    _at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)_
    _at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)_
    _at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)_
    _at com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)_
    _at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_
    _at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)_
    _at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
    _at java.base/java.lang.reflect.Method.invoke(Method.java:568)_
    _at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)_
    _at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)_
    _at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)_
    _at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)_
    _at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)_
    _at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)_
    _at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)_
    _at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)_
    _at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)_
    _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)_
    _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)_
    _at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)_
    _at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)_
    _at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)_
    _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)_
    _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)_
    _at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_
    _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_
    _at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)_
    _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_
    _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_
    _at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)_
    _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)_
    _at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)_
    _at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)_
    _at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)_
    _at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)_
    _at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)_
    _at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)_
    _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)_
    _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)_
    _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)_
    _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)_
    _at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)_
    _at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)_
    _at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)_
    _at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)_
    _at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)_
    _at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)_
    _at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)_
    _at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)_
    _at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)_

 
{quote}
example code :
{quote}import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;

import java.sql.Timestamp;

import static org.apache.flink.table.api.Expressions.$;

public class BugTest {
@Test
public void testRowNumber() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings mySetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting);
env.setParallelism(1);

DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer,
Integer, String, String, String>> oriStream = env.addSource(new CustomSourceRowNumber());
Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), $("sowing_task_detail_id"));
tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable);

Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery(
"SELECT `biz_bill_no`\n" +
",task_type\n" +
",task_mode\n" +
",parent_task_no\n"+
",total_stage_num\n"+
",current_stage_index\n"+
",use_pre_task_owner\n"+
",poi_type\n"+
",biz_origin_bill_type\n"+
",sowing_task_no\n"+
" FROM (\n" +
" SELECT *,\n" +
" ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" +
" FROM wosOutSowingTaskDetail\n" +
" ) tmp\n" +
" WHERE rn = 1");
System.out.println("SQL 0 Plan: ");
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", wosOutSowingTaskDetailLatest);

DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(wosOutSowingTaskDetailLatest, Row.class);
// retractStream.print();

Table resultTable = tableEnv.sqlQuery("SELECT\n" +
"biz_bill_no\n" +
", CASE WHEN task_type = 21 AND task_mode = 51 THEN parent_task_no\n" +
" WHEN task_type = 21 AND task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" +
" ELSE sowing_task_no END AS parent_task_no_cw\n" +
",parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index,use_pre_task_owner \n" +
"FROM wosOutSowingTaskDetailLatest\n" +
"WHERE task_type = 21\n" +
"AND task_mode IN (51, 40)\n" +
"AND poi_type = 2\n");

System.out.println("SQL 1 Plan: ");
System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));
System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE));

DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
// resultStream.print();

env.execute();
}

static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, Integer, Integer, String, Integer, Integer,
Integer, Integer,
Integer, String, String, String>> {
private boolean isRuning = true;

@Override
public void run(SourceContext<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer,
Integer, String, String, String>> sourceContext) throws Exception {
while (isRuning) {

sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
Thread.sleep(Integer.MAX_VALUE);
}
}

@Override
public void cancel() {
isRuning = false;
}
}
}
{quote}
 

the logical plan in  System.out.println is
{quote}SQL 0 Plan: 
== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]): rowcount = 1.0E7, cumulative cost = \{3.1E8 rows, 1.78E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory}
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]): rowcount = 1.0E7, cumulative cost = \{3.0E8 rows, 1.78E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory}
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]]): rowcount = 1.0E8, cumulative cost = \{2.0E8 rows, 1.77E10 cpu, 8.8E9 io, 8.8E9 network, 0.0 memory}
      +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]): rowcount = 1.0E8, cumulative cost = \{1.0E8 rows, 1.0E8 cpu, 8.8E9 io, 0.0 network, 0.0 memory}

== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source

    Stage 2 : Operator
        content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD

        Stage 4 : Operator
            content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id])
            ship_strategy : HASH

            Stage 5 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I,UA,D])
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], changelogMode=[I])
      +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source

    Stage 7 : Operator
        content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD

        Stage 9 : Operator
            content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id])
            ship_strategy : HASH

            Stage 10 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD


SQL 1 Plan: 

org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)