You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/07/23 05:14:47 UTC
[flink] branch release-1.17 updated: [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions
This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new f98969beba5 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions
f98969beba5 is described below
commit f98969beba5b695da7fe06573b98f953ce1d5fb1
Author: yunhong <33...@qq.com>
AuthorDate: Sun Jul 23 13:14:40 2023 +0800
[FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions
This closes #23037
---
docs/data/sql_functions.yml | 2 +-
docs/data/sql_functions_zh.yml | 2 +-
.../logical/WrapJsonAggFunctionArgumentsRule.java | 113 +++++----
.../functions/JsonAggregationFunctionsITCase.java | 90 ++++++-
.../WrapJsonAggFunctionArgumentsRuleTest.java | 21 +-
.../WrapJsonAggFunctionArgumentsRuleTest.xml | 268 ++++++++++++++++++++-
.../runtime/batch/sql/agg/SortAggITCase.scala | 36 +++
.../runtime/stream/sql/AggregateITCase.scala | 43 ++++
8 files changed, 502 insertions(+), 73 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index aaeee9803f9..bbbc40c6e1a 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1040,7 +1040,7 @@ aggregate:
`ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is assumed by default.
This function is currently not supported in `OVER` windows, unbounded session windows, or hop
- windows. And it is not supported for use with other aggregate functions.
+ windows.
```sql
-- '["Apple","Banana","Orange"]'
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 1c432c2d18a..1d1ff646ed0 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1131,7 +1131,7 @@ aggregate:
项目表达式可以是任意的,包括其他 JSON 函数。如果值为 `NULL`,则 `ON NULL` 行为定义了要执行的操作。如果省略,默认情况下假定为 `ABSENT ON NULL`。
- 此函数目前不支持 `OVER` windows、未绑定的 session windows 或 hop windows。同时,此函数不支持与其他聚合函数一起使用。
+ 此函数目前不支持 `OVER` windows、未绑定的 session windows 或 hop windows。
```sql
-- '["Apple","Banana","Orange"]'
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
index 9194093717f..813034adf29 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
@@ -43,7 +43,9 @@ import org.immutables.value.Value;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -80,81 +82,86 @@ public class WrapJsonAggFunctionArgumentsRule
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalAggregate aggregate = call.rel(0);
- final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
final RelNode aggInput = aggregate.getInput();
final RelBuilder relBuilder = call.builder().push(aggInput);
- final List<Integer> affectedArgs = getAffectedArgs(aggCall);
- addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
- final TargetMapping argsMapping =
- getAggArgsMapping(aggInput.getRowType().getFieldCount(), affectedArgs);
-
- final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
- final LogicalAggregate newAggregate =
- aggregate.copy(
- aggregate.getTraitSet(),
- relBuilder.build(),
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- Collections.singletonList(newAggregateCall));
- call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+ final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate, relBuilder);
+ call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
}
- /**
- * Returns the aggregation's arguments which need to be wrapped.
- *
- * <p>This list is a subset of {@link AggregateCall#getArgList()} as not every argument may need
- * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING} call.
- *
- * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are removed as we only need to
- * wrap them once.
- */
- private List<Integer> getAffectedArgs(AggregateCall aggCall) {
- if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
- // For JSON_OBJECTAGG we only need to wrap its second (= value) argument
- final int valueIndex = aggCall.getArgList().get(1);
- return Collections.singletonList(valueIndex);
+ private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate, RelBuilder relBuilder) {
+ final int inputCount = aggregate.getInput().getRowType().getFieldCount();
+ List<AggregateCall> aggCallList = new ArrayList<>(aggregate.getAggCallList());
+ // This map is a mapping relationship between jsonObjectAggCall and the argument index
+ // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING. This map will be used
+ // to create newWrappedArgCallList after creating a new Project.
+ Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+ for (int i = 0; i < aggCallList.size(); i++) {
+ AggregateCall currentCall = aggCallList.get(i);
+ if (currentCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
+ // For JSON_OBJECTAGG we only need to wrap its second (= value) argument
+ final int valueIndex = currentCall.getArgList().get(1);
+ wrapIndicesMap.put(i, valueIndex);
+ } else if (currentCall.getAggregation() instanceof SqlJsonArrayAggAggFunction) {
+ final int valueIndex = currentCall.getArgList().get(0);
+ wrapIndicesMap.put(i, valueIndex);
+ }
+ }
+
+ // Create a new Project.
+ Map<Integer, Integer> valueIndicesAfterProjection = new HashMap<>();
+ addProjections(
+ aggregate.getCluster(),
+ relBuilder,
+ wrapIndicesMap.values().stream().distinct().sorted().collect(Collectors.toList()),
+ inputCount,
+ valueIndicesAfterProjection);
+
+ List<AggregateCall> newWrappedArgCallList = new ArrayList<>(aggCallList);
+ final int newInputCount = inputCount + valueIndicesAfterProjection.size();
+ for (Integer jsonAggCallIndex : wrapIndicesMap.keySet()) {
+ final TargetMapping argsMapping =
+ Mappings.create(MappingType.BIJECTION, newInputCount, newInputCount);
+ Integer valueIndex = wrapIndicesMap.get(jsonAggCallIndex);
+ argsMapping.set(valueIndex, valueIndicesAfterProjection.get(valueIndex));
+ final AggregateCall newAggregateCall =
+ newWrappedArgCallList.get(jsonAggCallIndex).transform(argsMapping);
+ newWrappedArgCallList.set(jsonAggCallIndex, newAggregateCall);
}
- return aggCall.getArgList().stream().distinct().collect(Collectors.toList());
+ return aggregate.copy(
+ aggregate.getTraitSet(),
+ relBuilder.build(),
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ newWrappedArgCallList);
}
/**
- * Adds (wrapped) projections for affected arguments of the aggregation.
+ * Adds (wrapped) projections for affected arguments of the aggregation. For duplicate
+ * projection fields, we only wrap them once and record the conversion relationship in the map
+ * valueIndicesAfterProjection.
*
* <p>Note that we cannot override any of the projections as a field may be used multiple times,
* and in particular outside of the aggregation call. Therefore, we explicitly add the wrapped
* projection as an additional one.
*/
private void addProjections(
- RelOptCluster cluster, RelBuilder relBuilder, List<Integer> affectedArgs) {
+ RelOptCluster cluster,
+ RelBuilder relBuilder,
+ List<Integer> affectedArgs,
+ int inputCount,
+ Map<Integer, Integer> valueIndicesAfterProjection) {
final BridgingSqlFunction operandToStringOperator =
BridgingSqlFunction.of(cluster, JSON_STRING);
final List<RexNode> projects = new ArrayList<>();
- affectedArgs.stream()
- .map(argIdx -> relBuilder.call(operandToStringOperator, relBuilder.field(argIdx)))
- .forEach(projects::add);
-
- relBuilder.projectPlus(projects);
- }
-
- /**
- * Returns a {@link TargetMapping} that defines how the arguments of the aggregation must be
- * mapped such that the wrapped arguments are used instead.
- */
- private TargetMapping getAggArgsMapping(int inputCount, List<Integer> affectedArgs) {
- final int newCount = inputCount + affectedArgs.size();
-
- final TargetMapping argsMapping =
- Mappings.create(MappingType.BIJECTION, newCount, newCount);
- for (int i = 0; i < affectedArgs.size(); i++) {
- argsMapping.set(affectedArgs.get(i), inputCount + i);
+ for (Integer argIdx : affectedArgs) {
+ valueIndicesAfterProjection.put(argIdx, inputCount + projects.size());
+ projects.add(relBuilder.call(operandToStringOperator, relBuilder.field(argIdx)));
}
- return argsMapping;
+ relBuilder.projectPlus(projects);
}
private static boolean isJsonAggregation(AggregateCall aggCall) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index d92432a80d1..cec9f237e68 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -119,6 +119,52 @@ class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTestBase {
Arrays.asList(
Row.of(1, "{\"A\":0,\"B\":0}"),
Row.of(2, "{\"A\":0,\"C\":0}"))),
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+ .withDescription("Basic Json Aggregation With Other Aggs")
+ .withSource(
+ ROW(STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A", 1),
+ Row.ofKind(INSERT, "B", null),
+ Row.ofKind(INSERT, "C", 3)))
+ .testResult(
+ source ->
+ "SELECT max(f1), JSON_OBJECTAGG(f0 VALUE f1) FROM "
+ + source,
+ source ->
+ source.select(
+ $("f1").max(),
+ jsonObjectAgg(JsonOnNull.NULL, $("f0"), $("f1"))),
+ ROW(INT(), VARCHAR(2000).notNull()),
+ ROW(INT(), STRING().notNull()),
+ Collections.singletonList(
+ Row.of(3, "{\"A\":1,\"B\":null,\"C\":3}"))),
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+ .withDescription("Group Json Aggregation With Other Aggs")
+ .withSource(
+ ROW(INT(), STRING(), INT()),
+ Arrays.asList(
+ Row.ofKind(INSERT, 1, "A", 1),
+ Row.ofKind(INSERT, 1, "B", 3),
+ Row.ofKind(INSERT, 2, "A", 2),
+ Row.ofKind(INSERT, 2, "C", 5)))
+ .testResult(
+ source ->
+ "SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), max(f2) FROM "
+ + source
+ + " GROUP BY f0",
+ source ->
+ source.groupBy($("f0"))
+ .select(
+ $("f0"),
+ jsonObjectAgg(
+ JsonOnNull.NULL, $("f1"), $("f2")),
+ $("f2").max()),
+ ROW(INT(), VARCHAR(2000).notNull(), INT()),
+ ROW(INT(), STRING().notNull(), INT()),
+ Arrays.asList(
+ Row.of(1, "{\"A\":1,\"B\":3}", 3),
+ Row.of(2, "{\"A\":2,\"C\":5}", 5))),
// JSON_ARRAYAGG
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
@@ -163,6 +209,48 @@ class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTestBase {
source -> source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
ROW(VARCHAR(2000).notNull()),
ROW(STRING().notNull()),
- Collections.singletonList(Row.of("[1,3]"))));
+ Collections.singletonList(Row.of("[1,3]"))),
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+ .withDescription("Basic Array Aggregation With Other Aggs")
+ .withSource(
+ ROW(STRING()),
+ Arrays.asList(
+ Row.ofKind(INSERT, "A"),
+ Row.ofKind(INSERT, (String) null),
+ Row.ofKind(INSERT, "C")))
+ .testResult(
+ source -> "SELECT max(f0), JSON_ARRAYAGG(f0) FROM " + source,
+ source ->
+ source.select(
+ $("f0").max(),
+ jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+ ROW(STRING(), VARCHAR(2000).notNull()),
+ ROW(STRING(), STRING().notNull()),
+ Collections.singletonList(Row.of("C", "[\"A\",\"C\"]"))),
+ TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+ .withDescription("Group Array Aggregation With Other Aggs")
+ .withSource(
+ ROW(INT(), STRING()),
+ Arrays.asList(
+ Row.ofKind(INSERT, 1, "A"),
+ Row.ofKind(INSERT, 1, null),
+ Row.ofKind(INSERT, 2, "C"),
+ Row.ofKind(INSERT, 2, "D")))
+ .testResult(
+ source ->
+ "SELECT f0, max(f1), JSON_ARRAYAGG(f1)FROM "
+ + source
+ + " GROUP BY f0",
+ source ->
+ source.groupBy($("f0"))
+ .select(
+ $("f0"),
+ $("f1").max(),
+ jsonArrayAgg(JsonOnNull.ABSENT, $("f1"))),
+ ROW(INT(), STRING(), VARCHAR(2000).notNull()),
+ ROW(INT(), STRING(), STRING().notNull()),
+ Arrays.asList(
+ Row.of(1, "A", "[\"A\"]"),
+ Row.of(2, "D", "[\"C\",\"D\"]"))));
}
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
index c19affe02a9..ba4d497fd9e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
@@ -99,24 +99,37 @@ public class WrapJsonAggFunctionArgumentsRuleTest extends TableTestBase {
util.verifyRelPlan("SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0");
}
- @Test(expected = AssertionError.class)
+ @Test
public void testJsonObjectAggWithOtherAggs() {
util.verifyRelPlan("SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T");
}
- @Test(expected = AssertionError.class)
+ @Test
public void testGroupJsonObjectAggWithOtherAggs() {
util.verifyRelPlan(
"SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0");
}
- @Test(expected = AssertionError.class)
+ @Test
public void testJsonArrayAggWithOtherAggs() {
util.verifyRelPlan("SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T");
}
- @Test(expected = AssertionError.class)
+ @Test
public void testGroupJsonArrayAggInWithOtherAggs() {
util.verifyRelPlan("SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0");
}
+
+ @Test
+ public void testJsonArrayAggAndJsonObjectAggWithOtherAggs() {
+ util.verifyRelPlan(
+ "SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T");
+ }
+
+ @Test
+ public void testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs() {
+ util.verifyRelPlan(
+ "SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2),"
+ + " SUM(f2) FROM T GROUP BY f0");
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
index b0f9f78efdb..4f7785fa546 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
@@ -16,6 +16,127 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $2)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($2)], EXPR$4=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f4) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f4) AS EXPR$3, SUM(f2) AS EXPR$4])
++- Exchange(distribution=[hash[f0]])
+ +- Calc(select=[f0, f1, f2, JSON_STRING(f1) AS $f3, JSON_STRING(f2) AS $f4])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $2)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($2)], EXPR$4=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f4) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f4) AS EXPR$3, SUM(f2) AS EXPR$4])
++- Calc(select=[f0, f1, f2, JSON_STRING(f1) AS $f3, JSON_STRING(f2) AS $f4])
+ +- Sort(orderBy=[f0 ASC])
+ +- Exchange(distribution=[hash[f0]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupJsonArrayAggInWithOtherAggs[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)], EXPR$3=[SUM($1)])
++- LogicalProject(f0=[$0], f2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Exchange(distribution=[hash[f0]])
+ +- Calc(select=[f0, f2, JSON_STRING(f0) AS $f2])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f2], metadata=[]]], fields=[f0, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupJsonArrayAggInWithOtherAggs[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)], EXPR$3=[SUM($1)])
++- LogicalProject(f0=[$0], f2=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Calc(select=[f0, f2, JSON_STRING(f0) AS $f2])
+ +- Sort(orderBy=[f0 ASC])
+ +- Exchange(distribution=[hash[f0]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f2], metadata=[]]], fields=[f0, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupJsonObjectAggWithOtherAggs[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$3=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f3) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Exchange(distribution=[hash[f0]])
+ +- Calc(select=[f0, f1, f2, JSON_STRING(f0) AS $f3])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupJsonObjectAggWithOtherAggs[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$3=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[f0], select=[f0, Final_COUNT(count1$0) AS EXPR$1, Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$2) AS EXPR$2, Final_SUM(sum$1) AS EXPR$3])
++- Sort(orderBy=[f0 ASC])
+ +- Exchange(distribution=[hash[f0]])
+ +- LocalSortAggregate(groupBy=[f0], select=[f0, Partial_COUNT(*) AS count1$0, Partial_JSON_OBJECTAGG_NULL_ON_NULL(f1, $f3) AS EXPR$2, Partial_SUM(f2) AS sum$1])
+ +- Calc(select=[f0, f1, f2, JSON_STRING(f0) AS $f3])
+ +- Sort(orderBy=[f0 ASC])
+ +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testJsonArrayAgg[batchMode = false]">
<Resource name="sql">
<![CDATA[SELECT JSON_ARRAYAGG(f0) FROM T]]>
@@ -53,6 +174,46 @@ SortAggregate(isMerge=[false], select=[JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR
+- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0], f1=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(select=[MAX(f0) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$3])
++- Exchange(distribution=[single])
+ +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2, JSON_STRING(f1) AS $f3])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0], f1=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[false], select=[MAX(f0) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$3])
++- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2, JSON_STRING(f1) AS $f3])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
]]>
</Resource>
</TestCase>
@@ -94,6 +255,46 @@ SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, JSON_ARRAYAGG_ABSENT_ON
+- Sort(orderBy=[f0 ASC])
+- Exchange(distribution=[hash[f0]])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonArrayAggWithOtherAggs[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(select=[COUNT(*) AS EXPR$0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR$1])
++- Exchange(distribution=[single])
+ +- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonArrayAggWithOtherAggs[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR$1])
++- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
]]>
</Resource>
</TestCase>
@@ -114,6 +315,47 @@ GroupAggregate(select=[JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
+- Exchange(distribution=[single])
+- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonObjectAgg[batchMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
++- LogicalProject(f1=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+SortAggregate(isMerge=[true], select=[Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$0) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- LocalSortAggregate(select=[Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
+ +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJsonObjectAggInGroupWindow[batchMode = false]">
+ <Resource name="sql">
+ <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)])
++- LogicalProject(f0=[$0], f1=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[f0]])
+ +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
]]>
</Resource>
</TestCase>
@@ -140,44 +382,44 @@ SortAggregate(isMerge=[true], groupBy=[f0], select=[f0, Final_JSON_OBJECTAGG_NUL
]]>
</Resource>
</TestCase>
- <TestCase name="testJsonObjectAgg[batchMode = true]">
+ <TestCase name="testJsonObjectAggWithOtherAggs[batchMode = true]">
<Resource name="sql">
- <![CDATA[SELECT JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
+ <![CDATA[SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
+- LogicalProject(f1=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-SortAggregate(isMerge=[true], select=[Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$0) AS EXPR$0])
+SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0, Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$1) AS EXPR$1])
+- Exchange(distribution=[single])
- +- LocalSortAggregate(select=[Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
+ +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$1])
+- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
]]>
</Resource>
</TestCase>
- <TestCase name="testJsonObjectAggInGroupWindow[batchMode = false]">
+ <TestCase name="testJsonObjectAggWithOtherAggs[batchMode = false]">
<Resource name="sql">
- <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>
+ <![CDATA[SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)])
-+- LogicalProject(f0=[$0], f1=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
++- LogicalProject(f1=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1])
-+- Exchange(distribution=[hash[f0]])
- +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2])
- +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
+GroupAggregate(select=[COUNT(*) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$1])
++- Exchange(distribution=[single])
+ +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index ad6e14de7ea..262fbeca2ab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -365,6 +365,42 @@ class SortAggITCase extends AggregateITCaseBase("SortAggregate") {
Seq(row(4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L))
)
}
+
+ @Test
+ def testJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+ val sql =
+ s"""
+ |SELECT
+ | MAX(d), JSON_OBJECTAGG(g VALUE d), JSON_ARRAYAGG(d), JSON_ARRAYAGG(g)
+ |FROM Table5 WHERE d <= 3
+ |""".stripMargin
+ checkResult(
+ sql,
+ Seq(row("3, {\"ABC\":3,\"BCD\":3,\"Hallo\":1,\"Hallo Welt\":2," +
+ "\"Hallo Welt wie\":2,\"Hallo Welt wie gehts?\":3}, [1,2,2,3,3,3], " +
+ "[\"Hallo\",\"Hallo Welt\",\"Hallo Welt wie\",\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"]"))
+ )
+ }
+
+ @Test
+ def testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+ val sql =
+ s"""
+ |SELECT
+ | d, JSON_OBJECTAGG(g VALUE f), JSON_ARRAYAGG(g), JSON_ARRAYAGG(f), max(f)
+ |FROM Table5 WHERE d <= 3 GROUP BY d
+ |""".stripMargin
+ checkResult(
+ sql,
+ Seq(
+ row("1, {\"Hallo\":0}, [\"Hallo\"], [0], 0"),
+ row(
+ "2, {\"Hallo Welt\":1,\"Hallo Welt wie\":2}, [\"Hallo Welt\",\"Hallo Welt wie\"], [1,2], 2"),
+ row(
+ "3, {\"ABC\":4,\"BCD\":5,\"Hallo Welt wie gehts?\":3}, [\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"], [3,4,5], 5")
+ )
+ )
+ }
}
class MyPojoAggFunction extends AggregateFunction[MyPojo, CountAccumulator] {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 771373292b9..63483de3d80 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1796,4 +1796,47 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State
val expected = List("null,null", "15,15", "11,11")
assertEquals(expected, sink.getRetractResults)
}
+
+ @Test
+ def testJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+ val sink = new TestingRetractSink
+ val sql =
+ s"""
+ |SELECT
+ | MAX(d), JSON_OBJECTAGG(g VALUE d), JSON_ARRAYAGG(d), JSON_ARRAYAGG(g)
+ |FROM Table5 WHERE d <= 3
+ |""".stripMargin
+
+ val t = failingDataSource(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ tEnv.createTemporaryView("Table5", t)
+ tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+ val expected =
+ List(
+ "3,{\"ABC\":3,\"BCD\":3,\"Hallo\":1,\"Hallo Welt\":2,\"Hallo Welt wie\":2,\"Hallo Welt wie gehts?\":3}," +
+ "[1,2,2,3,3,3],[\"Hallo\",\"Hallo Welt\",\"Hallo Welt wie\",\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"]")
+ assertEquals(expected, sink.getRetractResults)
+ }
+
+ @Test
+ def testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+ val sink = new TestingRetractSink
+ val sql =
+ s"""
+ |SELECT
+ | d, JSON_OBJECTAGG(g VALUE f), JSON_ARRAYAGG(g), JSON_ARRAYAGG(f), max(f)
+ |FROM Table5 WHERE d <= 3 GROUP BY d
+ |""".stripMargin
+ val t = failingDataSource(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+ tEnv.createTemporaryView("Table5", t)
+ tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+ val expected =
+ List(
+ "3,{\"ABC\":4,\"BCD\":5,\"Hallo Welt wie gehts?\":3},[\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"],[3,4,5],5",
+ "1,{\"Hallo\":0},[\"Hallo\"],[0],0",
+ "2,{\"Hallo Welt\":1,\"Hallo Welt wie\":2},[\"Hallo Welt\",\"Hallo Welt wie\"],[1,2],2"
+ )
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
}