You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2022/04/17 03:42:00 UTC
[jira] [Updated] (FLINK-27272) The plan for query with local sort is incorrect if adaptive batch scheduler is enabled
[ https://issues.apache.org/jira/browse/FLINK-27272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-27272:
-------------------------------
Description:
Add the following test case in ForwardHashExchangeTest
{code:java}
@Test
public void testRankWithHashShuffle() {
util.verifyExecPlan(
"SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM T) WHERE rk <= 10");
}
{code}
The result plan is:
{code:java}
Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
+- Sort(orderBy=[a ASC, b ASC])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
{code}
There should be an additional {{Exchange(distribution=[forward])}} node between local {{Rank}} and {{Sort}}, other wise if adaptive batch scheduler is enabled but operator chain is disabled, the result may be wrong. Because the parallelism for local {{Rank}} and {{Sort}} should be same, otherwise the adaptive batch scheduler may change their parallelism.
Local sort agg has the similar problem.
was:
Add the following test case in ForwardHashExchangeTest
{code:java}
@Test
public void testRankWithHashShuffle() {
util.verifyExecPlan(
"SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM T) WHERE rk <= 10");
}
{code}
The result plan is:
{code:java}
Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
+- Sort(orderBy=[a ASC, b ASC])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
{code}
There should be an additional {{Exchange(distribution=[forward])}} node between local {{Rank}} and {{Sort}}, other wise if adaptive batch scheduler is enabled but operator chain is disabled, the result may be wrong. Because the parallelism for local {{Rank}} and {{Sort}} should be same, otherwise the adaptive batch scheduler may change their parallelism.
Local sort agg has similar problem.
> The plan for query with local sort is incorrect if adaptive batch scheduler is enabled
> --------------------------------------------------------------------------------------
>
> Key: FLINK-27272
> URL: https://issues.apache.org/jira/browse/FLINK-27272
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.15.0
> Reporter: godfrey he
> Priority: Major
>
> Add the following test case in ForwardHashExchangeTest
> {code:java}
> @Test
> public void testRankWithHashShuffle() {
> util.verifyExecPlan(
> "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM T) WHERE rk <= 10");
> }
> {code}
> The result plan is:
> {code:java}
> Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
> +- Exchange(distribution=[forward])
> +- Sort(orderBy=[a ASC, b ASC])
> +- Exchange(distribution=[hash[a]])
> +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
> +- Sort(orderBy=[a ASC, b ASC])
> +- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
> {code}
> There should be an additional {{Exchange(distribution=[forward])}} node between local {{Rank}} and {{Sort}}, other wise if adaptive batch scheduler is enabled but operator chain is disabled, the result may be wrong. Because the parallelism for local {{Rank}} and {{Sort}} should be same, otherwise the adaptive batch scheduler may change their parallelism.
> Local sort agg has the similar problem.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)