You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/07/23 05:14:47 UTC

[flink] branch release-1.17 updated: [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new f98969beba5 [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions
f98969beba5 is described below

commit f98969beba5b695da7fe06573b98f953ce1d5fb1
Author: yunhong <33...@qq.com>
AuthorDate: Sun Jul 23 13:14:40 2023 +0800

    [FLINK-32456][table-planner] Support JSON_OBJECTAGG & JSON_ARRAYAGG use with other aggregate functions
    
    This closes #23037
---
 docs/data/sql_functions.yml                        |   2 +-
 docs/data/sql_functions_zh.yml                     |   2 +-
 .../logical/WrapJsonAggFunctionArgumentsRule.java  | 113 +++++----
 .../functions/JsonAggregationFunctionsITCase.java  |  90 ++++++-
 .../WrapJsonAggFunctionArgumentsRuleTest.java      |  21 +-
 .../WrapJsonAggFunctionArgumentsRuleTest.xml       | 268 ++++++++++++++++++++-
 .../runtime/batch/sql/agg/SortAggITCase.scala      |  36 +++
 .../runtime/stream/sql/AggregateITCase.scala       |  43 ++++
 8 files changed, 502 insertions(+), 73 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index aaeee9803f9..bbbc40c6e1a 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1040,7 +1040,7 @@ aggregate:
       `ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is assumed by default.
 
       This function is currently not supported in `OVER` windows, unbounded session windows, or hop
-      windows. And it is not supported for use with other aggregate functions.
+      windows.
 
       ```sql
       -- '["Apple","Banana","Orange"]'
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 1c432c2d18a..1d1ff646ed0 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1131,7 +1131,7 @@ aggregate:
 
       项目表达式可以是任意的,包括其他 JSON 函数。如果值为 `NULL`,则 `ON NULL` 行为定义了要执行的操作。如果省略,默认情况下假定为 `ABSENT ON NULL`。
 
-      此函数目前不支持 `OVER` windows、未绑定的 session windows 或 hop windows。同时,此函数不支持与其他聚合函数一起使用。
+      此函数目前不支持 `OVER` windows、未绑定的 session windows 或 hop windows。
 
       ```sql
       -- '["Apple","Banana","Orange"]'
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
index 9194093717f..813034adf29 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
@@ -43,7 +43,9 @@ import org.immutables.value.Value;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -80,81 +82,86 @@ public class WrapJsonAggFunctionArgumentsRule
     @Override
     public void onMatch(RelOptRuleCall call) {
         final LogicalAggregate aggregate = call.rel(0);
-        final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
         final RelNode aggInput = aggregate.getInput();
         final RelBuilder relBuilder = call.builder().push(aggInput);
 
-        final List<Integer> affectedArgs = getAffectedArgs(aggCall);
-        addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
-        final TargetMapping argsMapping =
-                getAggArgsMapping(aggInput.getRowType().getFieldCount(), affectedArgs);
-
-        final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
-        final LogicalAggregate newAggregate =
-                aggregate.copy(
-                        aggregate.getTraitSet(),
-                        relBuilder.build(),
-                        aggregate.getGroupSet(),
-                        aggregate.getGroupSets(),
-                        Collections.singletonList(newAggregateCall));
-        call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+        final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate, relBuilder);
+        call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
     }
 
-    /**
-     * Returns the aggregation's arguments which need to be wrapped.
-     *
-     * <p>This list is a subset of {@link AggregateCall#getArgList()} as not every argument may need
-     * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING} call.
-     *
-     * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are removed as we only need to
-     * wrap them once.
-     */
-    private List<Integer> getAffectedArgs(AggregateCall aggCall) {
-        if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
-            // For JSON_OBJECTAGG we only need to wrap its second (= value) argument
-            final int valueIndex = aggCall.getArgList().get(1);
-            return Collections.singletonList(valueIndex);
+    private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate, RelBuilder relBuilder) {
+        final int inputCount = aggregate.getInput().getRowType().getFieldCount();
+        List<AggregateCall> aggCallList = new ArrayList<>(aggregate.getAggCallList());
+        // This map is a mapping relationship between jsonObjectAggCall and the argument index
+        // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING. This map will be used
+        // to create newWrappedArgCallList after creating a new Project.
+        Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+        for (int i = 0; i < aggCallList.size(); i++) {
+            AggregateCall currentCall = aggCallList.get(i);
+            if (currentCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
+                // For JSON_OBJECTAGG we only need to wrap its second (= value) argument
+                final int valueIndex = currentCall.getArgList().get(1);
+                wrapIndicesMap.put(i, valueIndex);
+            } else if (currentCall.getAggregation() instanceof SqlJsonArrayAggAggFunction) {
+                final int valueIndex = currentCall.getArgList().get(0);
+                wrapIndicesMap.put(i, valueIndex);
+            }
+        }
+
+        // Create a new Project.
+        Map<Integer, Integer> valueIndicesAfterProjection = new HashMap<>();
+        addProjections(
+                aggregate.getCluster(),
+                relBuilder,
+                wrapIndicesMap.values().stream().distinct().sorted().collect(Collectors.toList()),
+                inputCount,
+                valueIndicesAfterProjection);
+
+        List<AggregateCall> newWrappedArgCallList = new ArrayList<>(aggCallList);
+        final int newInputCount = inputCount + valueIndicesAfterProjection.size();
+        for (Integer jsonAggCallIndex : wrapIndicesMap.keySet()) {
+            final TargetMapping argsMapping =
+                    Mappings.create(MappingType.BIJECTION, newInputCount, newInputCount);
+            Integer valueIndex = wrapIndicesMap.get(jsonAggCallIndex);
+            argsMapping.set(valueIndex, valueIndicesAfterProjection.get(valueIndex));
+            final AggregateCall newAggregateCall =
+                    newWrappedArgCallList.get(jsonAggCallIndex).transform(argsMapping);
+            newWrappedArgCallList.set(jsonAggCallIndex, newAggregateCall);
         }
 
-        return aggCall.getArgList().stream().distinct().collect(Collectors.toList());
+        return aggregate.copy(
+                aggregate.getTraitSet(),
+                relBuilder.build(),
+                aggregate.getGroupSet(),
+                aggregate.getGroupSets(),
+                newWrappedArgCallList);
     }
 
     /**
-     * Adds (wrapped) projections for affected arguments of the aggregation.
+     * Adds (wrapped) projections for affected arguments of the aggregation. For duplicate
+     * projection fields, we only wrap them once and record the conversion relationship in the map
+     * valueIndicesAfterProjection.
      *
      * <p>Note that we cannot override any of the projections as a field may be used multiple times,
      * and in particular outside of the aggregation call. Therefore, we explicitly add the wrapped
      * projection as an additional one.
      */
     private void addProjections(
-            RelOptCluster cluster, RelBuilder relBuilder, List<Integer> affectedArgs) {
+            RelOptCluster cluster,
+            RelBuilder relBuilder,
+            List<Integer> affectedArgs,
+            int inputCount,
+            Map<Integer, Integer> valueIndicesAfterProjection) {
         final BridgingSqlFunction operandToStringOperator =
                 BridgingSqlFunction.of(cluster, JSON_STRING);
 
         final List<RexNode> projects = new ArrayList<>();
-        affectedArgs.stream()
-                .map(argIdx -> relBuilder.call(operandToStringOperator, relBuilder.field(argIdx)))
-                .forEach(projects::add);
-
-        relBuilder.projectPlus(projects);
-    }
-
-    /**
-     * Returns a {@link TargetMapping} that defines how the arguments of the aggregation must be
-     * mapped such that the wrapped arguments are used instead.
-     */
-    private TargetMapping getAggArgsMapping(int inputCount, List<Integer> affectedArgs) {
-        final int newCount = inputCount + affectedArgs.size();
-
-        final TargetMapping argsMapping =
-                Mappings.create(MappingType.BIJECTION, newCount, newCount);
-        for (int i = 0; i < affectedArgs.size(); i++) {
-            argsMapping.set(affectedArgs.get(i), inputCount + i);
+        for (Integer argIdx : affectedArgs) {
+            valueIndicesAfterProjection.put(argIdx, inputCount + projects.size());
+            projects.add(relBuilder.call(operandToStringOperator, relBuilder.field(argIdx)));
         }
 
-        return argsMapping;
+        relBuilder.projectPlus(projects);
     }
 
     private static boolean isJsonAggregation(AggregateCall aggCall) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index d92432a80d1..cec9f237e68 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -119,6 +119,52 @@ class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTestBase {
                                 Arrays.asList(
                                         Row.of(1, "{\"A\":0,\"B\":0}"),
                                         Row.of(2, "{\"A\":0,\"C\":0}"))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+                        .withDescription("Basic Json Aggregation With Other Aggs")
+                        .withSource(
+                                ROW(STRING(), INT()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, "A", 1),
+                                        Row.ofKind(INSERT, "B", null),
+                                        Row.ofKind(INSERT, "C", 3)))
+                        .testResult(
+                                source ->
+                                        "SELECT max(f1), JSON_OBJECTAGG(f0 VALUE f1) FROM "
+                                                + source,
+                                source ->
+                                        source.select(
+                                                $("f1").max(),
+                                                jsonObjectAgg(JsonOnNull.NULL, $("f0"), $("f1"))),
+                                ROW(INT(), VARCHAR(2000).notNull()),
+                                ROW(INT(), STRING().notNull()),
+                                Collections.singletonList(
+                                        Row.of(3, "{\"A\":1,\"B\":null,\"C\":3}"))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
+                        .withDescription("Group Json Aggregation With Other Aggs")
+                        .withSource(
+                                ROW(INT(), STRING(), INT()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, 1, "A", 1),
+                                        Row.ofKind(INSERT, 1, "B", 3),
+                                        Row.ofKind(INSERT, 2, "A", 2),
+                                        Row.ofKind(INSERT, 2, "C", 5)))
+                        .testResult(
+                                source ->
+                                        "SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), max(f2) FROM "
+                                                + source
+                                                + " GROUP BY f0",
+                                source ->
+                                        source.groupBy($("f0"))
+                                                .select(
+                                                        $("f0"),
+                                                        jsonObjectAgg(
+                                                                JsonOnNull.NULL, $("f1"), $("f2")),
+                                                        $("f2").max()),
+                                ROW(INT(), VARCHAR(2000).notNull(), INT()),
+                                ROW(INT(), STRING().notNull(), INT()),
+                                Arrays.asList(
+                                        Row.of(1, "{\"A\":1,\"B\":3}", 3),
+                                        Row.of(2, "{\"A\":2,\"C\":5}", 5))),
 
                 // JSON_ARRAYAGG
                 TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
@@ -163,6 +209,48 @@ class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTestBase {
                                 source -> source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
-                                Collections.singletonList(Row.of("[1,3]"))));
+                                Collections.singletonList(Row.of("[1,3]"))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                        .withDescription("Basic Array Aggregation With Other Aggs")
+                        .withSource(
+                                ROW(STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, "A"),
+                                        Row.ofKind(INSERT, (String) null),
+                                        Row.ofKind(INSERT, "C")))
+                        .testResult(
+                                source -> "SELECT max(f0), JSON_ARRAYAGG(f0) FROM " + source,
+                                source ->
+                                        source.select(
+                                                $("f0").max(),
+                                                jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                ROW(STRING(), VARCHAR(2000).notNull()),
+                                ROW(STRING(), STRING().notNull()),
+                                Collections.singletonList(Row.of("C", "[\"A\",\"C\"]"))),
+                TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                        .withDescription("Group Array Aggregation With Other Aggs")
+                        .withSource(
+                                ROW(INT(), STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, 1, "A"),
+                                        Row.ofKind(INSERT, 1, null),
+                                        Row.ofKind(INSERT, 2, "C"),
+                                        Row.ofKind(INSERT, 2, "D")))
+                        .testResult(
+                                source ->
+                                        "SELECT f0, max(f1), JSON_ARRAYAGG(f1)FROM "
+                                                + source
+                                                + " GROUP BY f0",
+                                source ->
+                                        source.groupBy($("f0"))
+                                                .select(
+                                                        $("f0"),
+                                                        $("f1").max(),
+                                                        jsonArrayAgg(JsonOnNull.ABSENT, $("f1"))),
+                                ROW(INT(), STRING(), VARCHAR(2000).notNull()),
+                                ROW(INT(), STRING(), STRING().notNull()),
+                                Arrays.asList(
+                                        Row.of(1, "A", "[\"A\"]"),
+                                        Row.of(2, "D", "[\"C\",\"D\"]"))));
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
index c19affe02a9..ba4d497fd9e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
@@ -99,24 +99,37 @@ public class WrapJsonAggFunctionArgumentsRuleTest extends TableTestBase {
         util.verifyRelPlan("SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0");
     }
 
-    @Test(expected = AssertionError.class)
+    @Test
     public void testJsonObjectAggWithOtherAggs() {
         util.verifyRelPlan("SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T");
     }
 
-    @Test(expected = AssertionError.class)
+    @Test
     public void testGroupJsonObjectAggWithOtherAggs() {
         util.verifyRelPlan(
                 "SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0");
     }
 
-    @Test(expected = AssertionError.class)
+    @Test
     public void testJsonArrayAggWithOtherAggs() {
         util.verifyRelPlan("SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T");
     }
 
-    @Test(expected = AssertionError.class)
+    @Test
     public void testGroupJsonArrayAggInWithOtherAggs() {
         util.verifyRelPlan("SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0");
     }
+
+    @Test
+    public void testJsonArrayAggAndJsonObjectAggWithOtherAggs() {
+        util.verifyRelPlan(
+                "SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T");
+    }
+
+    @Test
+    public void testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs() {
+        util.verifyRelPlan(
+                "SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2),"
+                        + " SUM(f2) FROM T GROUP BY f0");
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
index b0f9f78efdb..4f7785fa546 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
@@ -16,6 +16,127 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $2)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($2)], EXPR$4=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f4) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f4) AS EXPR$3, SUM(f2) AS EXPR$4])
++- Exchange(distribution=[hash[f0]])
+   +- Calc(select=[f0, f1, f2, JSON_STRING(f1) AS $f3, JSON_STRING(f2) AS $f4])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f2), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f2), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $2)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($2)], EXPR$4=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f4) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f4) AS EXPR$3, SUM(f2) AS EXPR$4])
++- Calc(select=[f0, f1, f2, JSON_STRING(f1) AS $f3, JSON_STRING(f2) AS $f4])
+   +- Sort(orderBy=[f0 ASC])
+      +- Exchange(distribution=[hash[f0]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupJsonArrayAggInWithOtherAggs[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)], EXPR$3=[SUM($1)])
++- LogicalProject(f0=[$0], f2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Exchange(distribution=[hash[f0]])
+   +- Calc(select=[f0, f2, JSON_STRING(f0) AS $f2])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f2], metadata=[]]], fields=[f0, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupJsonArrayAggInWithOtherAggs[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, COUNT(*), JSON_ARRAYAGG(f0), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)], EXPR$3=[SUM($1)])
++- LogicalProject(f0=[$0], f2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Calc(select=[f0, f2, JSON_STRING(f0) AS $f2])
+   +- Sort(orderBy=[f0 ASC])
+      +- Exchange(distribution=[hash[f0]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f2], metadata=[]]], fields=[f0, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupJsonObjectAggWithOtherAggs[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$3=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, COUNT(*) AS EXPR$1, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f3) AS EXPR$2, SUM(f2) AS EXPR$3])
++- Exchange(distribution=[hash[f0]])
+   +- Calc(select=[f0, f1, f2, JSON_STRING(f0) AS $f3])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupJsonObjectAggWithOtherAggs[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, COUNT(*), JSON_OBJECTAGG(f1 VALUE f0), SUM(f2) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$3=[SUM($2)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[f0], select=[f0, Final_COUNT(count1$0) AS EXPR$1, Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$2) AS EXPR$2, Final_SUM(sum$1) AS EXPR$3])
++- Sort(orderBy=[f0 ASC])
+   +- Exchange(distribution=[hash[f0]])
+      +- LocalSortAggregate(groupBy=[f0], select=[f0, Partial_COUNT(*) AS count1$0, Partial_JSON_OBJECTAGG_NULL_ON_NULL(f1, $f3) AS EXPR$2, Partial_SUM(f2) AS sum$1])
+         +- Calc(select=[f0, f1, f2, JSON_STRING(f0) AS $f3])
+            +- Sort(orderBy=[f0 ASC])
+               +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJsonArrayAgg[batchMode = false]">
     <Resource name="sql">
       <![CDATA[SELECT JSON_ARRAYAGG(f0) FROM T]]>
@@ -53,6 +174,46 @@ SortAggregate(isMerge=[false], select=[JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR
 +- Calc(select=[f0, JSON_STRING(f0) AS $f1])
    +- Exchange(distribution=[single])
       +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0], f1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(select=[MAX(f0) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$3])
++- Exchange(distribution=[single])
+   +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2, JSON_STRING(f1) AS $f3])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonArrayAggAndJsonObjectAggWithOtherAggs[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT MAX(f0), JSON_OBJECTAGG(f1 VALUE f0), JSON_ARRAYAGG(f1), JSON_ARRAYAGG(f0) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)], EXPR$2=[JSON_ARRAYAGG_ABSENT_ON_NULL($1)], EXPR$3=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0], f1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[false], select=[MAX(f0) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1, JSON_ARRAYAGG_ABSENT_ON_NULL($f3) AS EXPR$2, JSON_ARRAYAGG_ABSENT_ON_NULL($f2) AS EXPR$3])
++- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2, JSON_STRING(f1) AS $f3])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
 ]]>
     </Resource>
   </TestCase>
@@ -94,6 +255,46 @@ SortAggregate(isMerge=[false], groupBy=[f0], select=[f0, JSON_ARRAYAGG_ABSENT_ON
    +- Sort(orderBy=[f0 ASC])
       +- Exchange(distribution=[hash[f0]])
          +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonArrayAggWithOtherAggs[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(select=[COUNT(*) AS EXPR$0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR$1])
++- Exchange(distribution=[single])
+   +- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonArrayAggWithOtherAggs[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), JSON_ARRAYAGG(f0) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalProject(f0=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR$1])
++- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0], metadata=[]]], fields=[f0])
 ]]>
     </Resource>
   </TestCase>
@@ -114,6 +315,47 @@ GroupAggregate(select=[JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
 +- Exchange(distribution=[single])
    +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
       +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonObjectAgg[batchMode = true]">
+    <Resource name="sql">
+      <![CDATA[SELECT JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
++- LogicalProject(f1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+SortAggregate(isMerge=[true], select=[Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$0) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- LocalSortAggregate(select=[Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
+      +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+         +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonObjectAggInGroupWindow[batchMode = false]">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)])
++- LogicalProject(f0=[$0], f1=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1])
++- Exchange(distribution=[hash[f0]])
+   +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
 ]]>
     </Resource>
   </TestCase>
@@ -140,44 +382,44 @@ SortAggregate(isMerge=[true], groupBy=[f0], select=[f0, Final_JSON_OBJECTAGG_NUL
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJsonObjectAgg[batchMode = true]">
+  <TestCase name="testJsonObjectAggWithOtherAggs[batchMode = true]">
     <Resource name="sql">
-      <![CDATA[SELECT JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
+      <![CDATA[SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
 +- LogicalProject(f1=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$0) AS EXPR$0])
+SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0, Final_JSON_OBJECTAGG_NULL_ON_NULL(EXPR$1) AS EXPR$1])
 +- Exchange(distribution=[single])
-   +- LocalSortAggregate(select=[Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$0])
+   +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0, Partial_JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$1])
       +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
          +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJsonObjectAggInGroupWindow[batchMode = false]">
+  <TestCase name="testJsonObjectAggWithOtherAggs[batchMode = false]">
     <Resource name="sql">
-      <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>
+      <![CDATA[SELECT COUNT(*), JSON_OBJECTAGG(f1 VALUE f1) FROM T]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($1, $0)])
-+- LogicalProject(f0=[$0], f1=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[JSON_OBJECTAGG_NULL_ON_NULL($0, $0)])
++- LogicalProject(f1=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-GroupAggregate(groupBy=[f0], select=[f0, JSON_OBJECTAGG_NULL_ON_NULL(f1, $f2) AS EXPR$1])
-+- Exchange(distribution=[hash[f0]])
-   +- Calc(select=[f0, f1, JSON_STRING(f0) AS $f2])
-      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f0, f1], metadata=[]]], fields=[f0, f1])
+GroupAggregate(select=[COUNT(*) AS EXPR$0, JSON_OBJECTAGG_NULL_ON_NULL($f1, $f1) AS EXPR$1])
++- Exchange(distribution=[single])
+   +- Calc(select=[f1, JSON_STRING(f1) AS $f1])
+      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[f1], metadata=[]]], fields=[f1])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
index ad6e14de7ea..262fbeca2ab 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala
@@ -365,6 +365,42 @@ class SortAggITCase extends AggregateITCaseBase("SortAggregate") {
       Seq(row(4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L, 4L))
     )
   }
+
+  @Test
+  def testJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+    val sql =
+      s"""
+         |SELECT
+         |  MAX(d), JSON_OBJECTAGG(g VALUE d), JSON_ARRAYAGG(d), JSON_ARRAYAGG(g)
+         |FROM Table5 WHERE d <= 3
+         |""".stripMargin
+    checkResult(
+      sql,
+      Seq(row("3, {\"ABC\":3,\"BCD\":3,\"Hallo\":1,\"Hallo Welt\":2," +
+        "\"Hallo Welt wie\":2,\"Hallo Welt wie gehts?\":3}, [1,2,2,3,3,3], " +
+        "[\"Hallo\",\"Hallo Welt\",\"Hallo Welt wie\",\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"]"))
+    )
+  }
+
+  @Test
+  def testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+    val sql =
+      s"""
+         |SELECT
+         |  d, JSON_OBJECTAGG(g VALUE f), JSON_ARRAYAGG(g), JSON_ARRAYAGG(f), max(f)
+         |FROM Table5 WHERE d <= 3 GROUP BY d
+         |""".stripMargin
+    checkResult(
+      sql,
+      Seq(
+        row("1, {\"Hallo\":0}, [\"Hallo\"], [0], 0"),
+        row(
+          "2, {\"Hallo Welt\":1,\"Hallo Welt wie\":2}, [\"Hallo Welt\",\"Hallo Welt wie\"], [1,2], 2"),
+        row(
+          "3, {\"ABC\":4,\"BCD\":5,\"Hallo Welt wie gehts?\":3}, [\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"], [3,4,5], 5")
+      )
+    )
+  }
 }
 
 class MyPojoAggFunction extends AggregateFunction[MyPojo, CountAccumulator] {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 771373292b9..63483de3d80 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -1796,4 +1796,47 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State
     val expected = List("null,null", "15,15", "11,11")
     assertEquals(expected, sink.getRetractResults)
   }
+
+  @Test
+  def testJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+    val sink = new TestingRetractSink
+    val sql =
+      s"""
+         |SELECT
+         |  MAX(d), JSON_OBJECTAGG(g VALUE d), JSON_ARRAYAGG(d), JSON_ARRAYAGG(g)
+         |FROM Table5 WHERE d <= 3
+         |""".stripMargin
+
+    val t = failingDataSource(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    tEnv.createTemporaryView("Table5", t)
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected =
+      List(
+        "3,{\"ABC\":3,\"BCD\":3,\"Hallo\":1,\"Hallo Welt\":2,\"Hallo Welt wie\":2,\"Hallo Welt wie gehts?\":3}," +
+          "[1,2,2,3,3,3],[\"Hallo\",\"Hallo Welt\",\"Hallo Welt wie\",\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"]")
+    assertEquals(expected, sink.getRetractResults)
+  }
+
+  @Test
+  def testGroupJsonArrayAggAndJsonObjectAggWithOtherAggs(): Unit = {
+    val sink = new TestingRetractSink
+    val sql =
+      s"""
+         |SELECT
+         |  d, JSON_OBJECTAGG(g VALUE f), JSON_ARRAYAGG(g), JSON_ARRAYAGG(f), max(f)
+         |FROM Table5 WHERE d <= 3 GROUP BY d
+         |""".stripMargin
+    val t = failingDataSource(TestData.tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    tEnv.createTemporaryView("Table5", t)
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected =
+      List(
+        "3,{\"ABC\":4,\"BCD\":5,\"Hallo Welt wie gehts?\":3},[\"Hallo Welt wie gehts?\",\"ABC\",\"BCD\"],[3,4,5],5",
+        "1,{\"Hallo\":0},[\"Hallo\"],[0],0",
+        "2,{\"Hallo Welt\":1,\"Hallo Welt wie\":2},[\"Hallo Welt\",\"Hallo Welt wie\"],[1,2],2"
+      )
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
 }