You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2024/01/08 12:20:35 UTC

(flink) branch master updated (ed79a1fc312 -> 0df5ab5a331)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from ed79a1fc312 [hotfix][api] Adds @PublicEvolving to StateTtlConfig inner classes to remove ArchUnit exclusions
     new df71d07188e [FLINK-34000] Implement restore tests for IncrementalGroupAgg node
     new 0df5ab5a331 [FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../stream/IncrementalAggregateJsonPlanTest.java   | 106 -------------
 ...a => IncrementalGroupAggregateRestoreTest.java} |  12 +-
 .../IncrementalGroupAggregateTestPrograms.java     | 119 ++++++++++++++
 .../IncrementalAggregateJsonPlanITCase.java        |  78 ---------
 .../plan/incremental-group-aggregate-complex.json} | 176 ++++++++++-----------
 .../savepoint/_metadata                            | Bin 0 -> 20817 bytes
 .../plan/incremental-group-aggregate-simple.json}  | 116 ++++++--------
 .../savepoint/_metadata                            | Bin 0 -> 14768 bytes
 8 files changed, 251 insertions(+), 356 deletions(-)
 delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
 copy flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/{TemporalJoinRestoreTest.java => IncrementalGroupAggregateRestoreTest.java} (72%)
 create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
 delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
 rename flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out => restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json} (81%)
 create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata
 rename flink-table/flink-table-planner/src/test/resources/{org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out => restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json} (71%)
 create mode 100644 flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata


(flink) 02/02: [FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0df5ab5a3318d21e8be3ab9237900664e3741013
Author: bvarghese1 <bv...@confluent.io>
AuthorDate: Thu Jan 4 20:07:47 2024 -0800

    [FLINK-34000] Remove IncrementalGroupAgg Json Plan & IT tests
    
    - These are covered by the restore tests
---
 .../stream/IncrementalAggregateJsonPlanTest.java   | 106 ----
 .../IncrementalAggregateJsonPlanITCase.java        |  78 ---
 .../testIncrementalAggregate.out                   | 401 --------------
 ...lAggregateWithSumCountDistinctAndRetraction.out | 585 ---------------------
 4 files changed, 1170 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
deleted file mode 100644
index 26dcc04f303..00000000000
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
-import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.time.Duration;
-
-/** Test json serialization/deserialization for incremental aggregate. */
-class IncrementalAggregateJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-        tEnv.getConfig()
-                .set(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name())
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-                .set(
-                        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10))
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
-                .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-    }
-
-    @Test
-    void testIncrementalAggregate() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a bigint,\n"
-                        + "  c bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select a, "
-                        + "count(distinct c) as c "
-                        + "from MyTable group by a");
-    }
-
-    @Test
-    void testIncrementalAggregateWithSumCountDistinctAndRetraction() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  b bigint,\n"
-                        + "  sum_b int,\n"
-                        + "  cnt_distinct_b bigint,\n"
-                        + "  cnt1 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink "
-                        + "select b, sum(b1), count(distinct b1), count(1) "
-                        + " from "
-                        + "   (select a, count(b) as b, max(b) as b1 from MyTable group by a)"
-                        + " group by b");
-    }
-}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
deleted file mode 100644
index 6f72a3930a8..00000000000
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IncrementalAggregateJsonPlanITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/** Test for incremental aggregate json plan. */
-class IncrementalAggregateJsonPlanITCase extends JsonPlanTestBase {
-
-    @BeforeEach
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        tableEnv.getConfig()
-                .set(
-                        OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
-                        AggregatePhaseStrategy.TWO_PHASE.name())
-                .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-                .set(
-                        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                        Duration.ofSeconds(10))
-                .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
-                .set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), true);
-    }
-
-    @Test
-    void testIncrementalAggregate() throws IOException, ExecutionException, InterruptedException {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.smallData3()),
-                "a int",
-                "b bigint",
-                "c varchar");
-        createTestNonInsertOnlyValuesSinkTable(
-                "MySink", "b bigint", "a bigint", "primary key (b) not enforced");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select b, "
-                                + "count(distinct a) as a "
-                                + "from MyTable group by b")
-                .await();
-
-        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result);
-    }
-}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
deleted file mode 100644
index 46cc85e26f3..00000000000
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
+++ /dev/null
@@ -1,401 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 2 ] ],
-        "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]], fields=[a, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "CALL",
-      "internalName" : "$MOD$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "internalName" : "$HASH_CODE$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 1,
-          "type" : "VARCHAR(2147483647)"
-        } ],
-        "type" : "INT"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1024,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>",
-    "description" : "Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0, 2 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "a",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "logicalType" : {
-              "type" : "STRUCTURED_TYPE",
-              "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
-              "attributes" : [ {
-                "name" : "map",
-                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
-              } ]
-            },
-            "fields" : [ {
-              "name" : "map",
-              "keyClass" : {
-                "conversionClass" : "org.apache.flink.table.data.StringData"
-              }
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "a",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "logicalType" : {
-              "type" : "STRUCTURED_TYPE",
-              "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
-              "attributes" : [ {
-                "name" : "map",
-                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
-              } ]
-            },
-            "fields" : [ {
-              "name" : "map",
-              "keyClass" : {
-                "conversionClass" : "org.apache.flink.table.data.StringData"
-              }
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[a, $f2]])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-incremental-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "partialAggGrouping" : [ 0, 1 ],
-    "finalAggGrouping" : [ 0 ],
-    "partialOriginalAggCalls" : [ {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "partialAggCallNeedRetractions" : [ false ],
-    "partialLocalAggInputRowType" : "ROW<`a` BIGINT, `c` VARCHAR(2147483647), `$f2` INT>",
-    "partialAggNeedRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "incrementalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>",
-    "description" : "IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `count$0` BIGINT>",
-    "description" : "Exchange(distribution=[hash[a]])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 2 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "localAggInputRowType" : "ROW<`a` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "c",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, $f1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
deleted file mode 100644
index 7a48fa0143f..00000000000
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ /dev/null
@@ -1,585 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "b1",
-      "internalName" : "$MAX$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>",
-    "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS count1$0, MAX(b) AS max$1])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `count1$0` BIGINT, `max$1` INT>",
-    "description" : "Exchange(distribution=[hash[a]])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "b1",
-      "internalName" : "$MAX$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` BIGINT NOT NULL, `b1` INT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count1$0) AS b, MAX(max$1) AS b1])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "CALL",
-      "internalName" : "$MOD$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "internalName" : "$HASH_CODE$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 2,
-          "type" : "INT NOT NULL"
-        } ],
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1024,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>",
-    "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0, 2 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "needRetraction" : true,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT NOT NULL"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT NOT NULL"
-      }, {
-        "name" : "sum$0",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count1$3",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT NOT NULL"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT NOT NULL"
-      }, {
-        "name" : "sum$0",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count1$3",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[b, $f2]])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-incremental-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "partialAggGrouping" : [ 0, 1 ],
-    "finalAggGrouping" : [ 0 ],
-    "partialOriginalAggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "partialAggCallNeedRetractions" : [ true, true, true ],
-    "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` INT NOT NULL, `$f2` INT NOT NULL>",
-    "partialAggNeedRetraction" : true,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "incrementalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>",
-    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])"
-  }, {
-    "id" : 10,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` INT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 11,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 2 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT NOT NULL, `$f2_0` INT NOT NULL, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : true,
-    "indexOfCountStar" : 2,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])"
-  }, {
-    "id" : 12,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_b",
-              "dataType" : "INT"
-            }, {
-              "name" : "cnt_distinct_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt1",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, $f1, $f2, $f3])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 9,
-    "target" : 10,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 10,
-    "target" : 11,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 11,
-    "target" : 12,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}


(flink) 01/02: [FLINK-34000] Implement restore tests for IncrementalGroupAgg node

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df71d07188e745553b8174297ec7989f05cebf7a
Author: bvarghese1 <bv...@confluent.io>
AuthorDate: Thu Jan 4 20:05:38 2024 -0800

    [FLINK-34000] Implement restore tests for IncrementalGroupAgg node
---
 .../IncrementalGroupAggregateRestoreTest.java      |  40 ++
 .../IncrementalGroupAggregateTestPrograms.java     | 119 +++++
 .../plan/incremental-group-aggregate-complex.json  | 573 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 20817 bytes
 .../plan/incremental-group-aggregate-simple.json   | 373 ++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 14768 bytes
 6 files changed, 1105 insertions(+)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java
new file mode 100644
index 00000000000..250f50a38c7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecIncrementalGroupAggregate}. */
+public class IncrementalGroupAggregateRestoreTest extends RestoreTestBase {
+
+    public IncrementalGroupAggregateRestoreTest() {
+        super(StreamExecIncrementalGroupAggregate.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_SIMPLE,
+                IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_COMPLEX);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
new file mode 100644
index 00000000000..a1ca086d258
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecGroupAggregate}. */
+public class IncrementalGroupAggregateTestPrograms {
+
+    static final Row[] BEFORE_DATA = {
+        Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello world")
+    };
+
+    static final Row[] AFTER_DATA = {
+        Row.of(3, 2L, "foo"), Row.of(4, 4L, "bar"), Row.of(5, 2L, "foo bar")
+    };
+
+    static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"};
+
+    static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_SIMPLE =
+            TableTestProgram.of(
+                            "incremental-group-aggregate-simple",
+                            "validates incremental group aggregation")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(1))
+                    .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupConfig(
+                            OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("b BIGINT", "a BIGINT")
+                                    .consumedBeforeRestore("+I[1, 1]", "+I[2, 2]")
+                                    .consumedAfterRestore("-U[2, 2]", "+U[2, 3]", "+I[4, 1]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t\n"
+                                    + "     SELECT\n"
+                                    + "         b,\n"
+                                    + "         COUNT(DISTINCT a) AS a\n"
+                                    + "     FROM source_t\n"
+                                    + "     GROUP BY b")
+                    .build();
+
+    static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_COMPLEX =
+            TableTestProgram.of(
+                            "incremental-group-aggregate-complex",
+                            "validates incremental group aggregation with multiple aggregations")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(1))
+                    .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupConfig(
+                            OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "b BIGINT",
+                                            "sum_b BIGINT",
+                                            "cnt_distinct_b BIGINT",
+                                            "cnt_1 BIGINT")
+                                    .consumedBeforeRestore("+I[1, 5, 2, 3]")
+                                    .consumedAfterRestore(
+                                            "+I[2, 2, 1, 1]", "-U[1, 5, 2, 3]", "+U[1, 9, 3, 4]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT\n"
+                                    + "         b,\n"
+                                    + "         SUM(b1),\n"
+                                    + "         COUNT(DISTINCT b1),\n"
+                                    + "         COUNT(1)\n"
+                                    + "     FROM\n"
+                                    + "         (\n"
+                                    + "             SELECT\n"
+                                    + "                     a,\n"
+                                    + "                     COUNT(b) AS b,\n"
+                                    + "                     MAX(b) AS b1\n"
+                                    + "             FROM source_t GROUP BY a\n"
+                                    + "         )\n"
+                                    + "     GROUP BY b")
+                    .build();
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json
new file mode 100644
index 00000000000..e65797b211f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json
@@ -0,0 +1,573 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 10,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 1000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT>",
+    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "b",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "b1",
+      "internalName" : "$MAX$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>",
+    "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0, MAX(b) AS max$1])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "b",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "b1",
+      "internalName" : "$MAX$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "aggCallNeedRetractions" : [ false, false ],
+    "localAggInputRowType" : "ROW<`a` INT, `b` BIGINT>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `b1` BIGINT>",
+    "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b, MAX(max$1) AS b1])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$MOD$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$HASH_CODE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "BIGINT"
+        } ],
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1024,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>",
+    "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0, 2 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ true, true, true ],
+    "needRetraction" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "sum$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$1",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count1$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT NOT NULL"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "sum$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$1",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count1$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b, $f2]])"
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-incremental-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "partialAggGrouping" : [ 0, 1 ],
+    "finalAggGrouping" : [ 0 ],
+    "partialOriginalAggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "partialAggCallNeedRetractions" : [ true, true, true ],
+    "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>",
+    "partialAggNeedRetraction" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "incrementalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>",
+    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$SUM$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    }, {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 4 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ true, true, true ],
+    "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT, `$f2_0` BIGINT, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : true,
+    "indexOfCountStar" : 2,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "sum_b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "cnt_distinct_b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "cnt_1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1, $f2, $f3])"
+  } ],
+  "edges" : [ {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata
new file mode 100644
index 00000000000..e92f8f6214c
Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata differ
diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json
new file mode 100644
index 00000000000..e995a2b138a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json
@@ -0,0 +1,373 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 1 ], [ 0 ] ],
+        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`b` BIGINT, `a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 1000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` INT>",
+    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$MOD$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "internalName" : "$HASH_CODE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT"
+        } ],
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1024,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>",
+    "description" : "Calc(select=[b, a, MOD(HASH_CODE(a), 1024) AS $f2])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0, 2 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$f2",
+        "fieldType" : "INT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : "org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b, $f2]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-incremental-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "partialAggGrouping" : [ 0, 1 ],
+    "finalAggGrouping" : [ 0 ],
+    "partialOriginalAggCalls" : [ {
+      "name" : null,
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "partialAggCallNeedRetractions" : [ false ],
+    "partialLocalAggInputRowType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>",
+    "partialAggNeedRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "incrementalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>",
+    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, COUNT(distinct$0 count$0) AS count$0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : null,
+      "internalName" : "$$SUM0$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "localAggInputRowType" : "ROW<`b` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, $SUM0(count$0) AS $f1])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata
new file mode 100644
index 00000000000..b0e73b1a395
Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata differ