You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/01 11:08:59 UTC

[flink] branch master updated: [FLINK-28752][python][table-planner] Add the json plan support in Python UDFs

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ff6978c0eb [FLINK-28752][python][table-planner] Add the json plan support in Python UDFs
6ff6978c0eb is described below

commit 6ff6978c0eb56a9cd036a6bafdd049d0d8de9ba8
Author: huangxingbo <hx...@apache.org>
AuthorDate: Sat Jul 30 19:07:40 2022 +0800

    [FLINK-28752][python][table-planner] Add the json plan support in Python UDFs
    
    This closes #20398.
---
 .../pyflink/table/tests/test_pandas_udaf.py        |   3 +-
 flink-python/pyflink/table/tests/test_udaf.py      |   7 +-
 flink-python/pyflink/table/tests/test_udf.py       |   3 +-
 flink-python/pyflink/table/tests/test_udtf.py      |   3 +-
 .../nodes/exec/common/CommonExecPythonCalc.java    |  10 +-
 .../exec/common/CommonExecPythonCorrelate.java     |  12 +-
 .../nodes/exec/stream/StreamExecPythonCalc.java    |  26 +-
 .../exec/stream/StreamExecPythonCorrelate.java     |  28 +-
 .../stream/StreamExecPythonGroupAggregate.java     |  42 +-
 .../StreamExecPythonGroupWindowAggregate.java      |  56 ++-
 .../exec/stream/StreamExecPythonOverAggregate.java |  35 +-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |  10 +-
 .../nodes/exec/stream/PythonCalcJsonPlanTest.java  |  81 +++
 .../exec/stream/PythonCorrelateJsonPlanTest.java   |  89 ++++
 .../stream/PythonGroupAggregateJsonPlanTest.java   |  70 +++
 .../PythonGroupWindowAggregateJsonPlanTest.java    | 161 ++++++
 .../stream/PythonOverAggregateJsonPlanTest.java    | 147 ++++++
 .../testPythonCalc.out                             | 133 +++++
 .../testPythonFunctionInWhereClause.out            | 216 ++++++++
 .../testJoinWithFilter.out                         | 264 ++++++++++
 .../testPythonTableFunction.out                    | 210 ++++++++
 .../tesPythonAggCallsWithGroupBy.out               | 232 +++++++++
 .../testEventTimeHopWindow.out                     | 390 +++++++++++++++
 .../testEventTimeSessionWindow.out                 | 388 +++++++++++++++
 .../testEventTimeTumbleWindow.out                  | 513 +++++++++++++++++++
 .../testProcTimeHopWindow.out                      | 443 +++++++++++++++++
 .../testProcTimeSessionWindow.out                  | 441 +++++++++++++++++
 .../testProcTimeTumbleWindow.out                   | 542 +++++++++++++++++++++
 .../testProcTimeBoundedNonPartitionedRangeOver.out | 448 +++++++++++++++++
 .../testProcTimeBoundedPartitionedRangeOver.out    | 462 ++++++++++++++++++
 ...undedPartitionedRowsOverWithBuiltinProctime.out | 392 +++++++++++++++
 .../testProcTimeUnboundedPartitionedRangeOver.out  | 452 +++++++++++++++++
 .../testRowTimeBoundedPartitionedRowsOver.out      | 387 +++++++++++++++
 33 files changed, 6623 insertions(+), 73 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index 71cd6380f41..b0b0ec6ebc9 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -742,7 +742,6 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
                             "+I[3, 2.0, 4]"])
         os.remove(source_path)
 
-    @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_over_aggregate_from_json_plan(self):
         # create source file path
         tmp_dir = self.tempdir
@@ -806,7 +805,7 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
             from source_table
         """)
         from py4j.java_gateway import get_method
-        get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+        get_method(json_plan.execute(), "await")()
 
         import glob
         lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index e6f96527ff7..56c59da0dd3 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -17,7 +17,6 @@
 ################################################################################
 import collections
 import datetime
-import unittest
 from decimal import Decimal
 
 import pandas as pd
@@ -808,7 +807,6 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
                             "+I[1, 2018-03-11T03:10, 2018-03-11T04:10, 2]",
                             "+I[1, 2018-03-11T04:20, 2018-03-11T04:50, 1]"])
 
-    @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_group_aggregate_from_json_plan(self):
         # create source file path
         tmp_dir = self.tempdir
@@ -845,9 +843,8 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
                                                       "SELECT a, my_sum(b) FROM source_table "
                                                       "GROUP BY a")
         from py4j.java_gateway import get_method
-        get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+        get_method(json_plan.execute(), "await")()
 
-    @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_group_window_aggregate_from_json_plan(self):
         # create source file path
         tmp_dir = self.tempdir
@@ -906,7 +903,7 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
                                                       "a, b, "
                                                       "SESSION(rowtime, INTERVAL '30' MINUTE)")
         from py4j.java_gateway import get_method
-        get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+        get_method(json_plan.execute(), "await")()
 
         import glob
         lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 34801d7b76a..8c5a1fbff02 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -741,7 +741,6 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["+I[1970-01-01T00:00:00.123Z]"])
 
-    @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_from_json_plan(self):
         # create source file path
         tmp_dir = self.tempdir
@@ -783,7 +782,7 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
                                                       "add_one(b) "
                                                       "FROM source_table")
         from py4j.java_gateway import get_method
-        get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+        get_method(json_plan.execute(), "await")()
 
         import glob
         lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 5a730ffa5d8..f91c7ad0b50 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -75,7 +75,6 @@ class UserDefinedTableFunctionTests(object):
 class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                             PyFlinkStreamTableTestCase):
 
-    @unittest.skip("Python UDFs are currently unsupported in JSON plan")
     def test_execute_from_json_plan(self):
         # create source file path
         tmp_dir = self.tempdir
@@ -119,7 +118,7 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                                       " as T(x, y)"
                                                       " ON TRUE")
         from py4j.java_gateway import get_method
-        get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+        get_method(json_plan.execute(), "await")()
 
         import glob
         lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index 19eb4f38a5a..e102de9d063 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -48,6 +48,8 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexInputRef;
@@ -66,6 +68,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
         implements SingleTransformationTranslator<RowData> {
 
+    public static final String PYTHON_CALC_TRANSFORMATION = "python-calc";
+
+    public static final String FIELD_NAME_PROJECTION = "projection";
+
     private static final String PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.scalar."
                     + "PythonScalarFunctionOperator";
@@ -78,6 +84,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
             "org.apache.flink.table.runtime.operators.python.scalar.arrow."
                     + "ArrowPythonScalarFunctionOperator";
 
+    @JsonProperty(FIELD_NAME_PROJECTION)
     private final List<RexNode> projection;
 
     public CommonExecPythonCalc(
@@ -171,8 +178,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(PYTHON_CALC_TRANSFORMATION, config),
                 pythonOperator,
                 pythonOperatorResultTyeInfo,
                 inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 9c12d15329e..0de1483db13 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -46,6 +46,8 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
@@ -60,14 +62,21 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
         implements SingleTransformationTranslator<RowData> {
 
+    public static final String PYTHON_CORRELATE_TRANSFORMATION = "python-correlate";
+
+    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+    public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+
     private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator";
 
     private static final String EMBEDDED_PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.table.EmbeddedPythonTableFunctionOperator";
 
+    @JsonProperty(FIELD_NAME_JOIN_TYPE)
     private final FlinkJoinType joinType;
 
+    @JsonProperty(FIELD_NAME_FUNCTION_CALL)
     private final RexCall invocation;
 
     public CommonExecPythonCorrelate(
@@ -129,8 +138,7 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
                         pythonUdtfInputOffsets);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(PYTHON_CORRELATE_TRANSFORMATION, pythonConfig),
                 pythonOperator,
                 pythonOperatorOutputRowType,
                 inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
index 496ca96c1eb..39e42fef14d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
@@ -18,20 +18,31 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexNode;
 
 import java.util.Collections;
 import java.util.List;
 
 /** Stream {@link ExecNode} for Python ScalarFunctions. */
+@ExecNodeMetadata(
+        name = "stream-exec-python-calc",
+        version = 1,
+        producedTransformations = CommonExecPythonCalc.PYTHON_CALC_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_16,
+        minStateVersion = FlinkVersion.v1_16)
 public class StreamExecPythonCalc extends CommonExecPythonCalc implements StreamExecNode<RowData> {
 
     public StreamExecPythonCalc(
@@ -50,14 +61,15 @@ public class StreamExecPythonCalc extends CommonExecPythonCalc implements Stream
                 description);
     }
 
+    @JsonCreator
     public StreamExecPythonCalc(
-            int id,
-            ExecNodeContext context,
-            ReadableConfig persistedConfig,
-            List<RexNode> projection,
-            List<InputProperty> inputProperties,
-            RowType outputType,
-            String description) {
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+            @JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, projection, inputProperties, outputType, description);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
index 0ce687a804c..bf215843ea5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
@@ -18,14 +18,19 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 
@@ -33,6 +38,12 @@ import java.util.Collections;
 import java.util.List;
 
 /** Stream exec node which matches along with join a Python user defined table function. */
+@ExecNodeMetadata(
+        name = "stream-exec-python-correlate",
+        version = 1,
+        producedTransformations = CommonExecPythonCorrelate.PYTHON_CORRELATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_16,
+        minStateVersion = FlinkVersion.v1_16)
 public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
         implements StreamExecNode<RowData> {
 
@@ -54,15 +65,16 @@ public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
                 description);
     }
 
+    @JsonCreator
     public StreamExecPythonCorrelate(
-            int id,
-            ExecNodeContext context,
-            ReadableConfig persistedConfig,
-            FlinkJoinType joinType,
-            RexNode invocation,
-            List<InputProperty> inputProperties,
-            RowType outputType,
-            String description) {
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+            @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+            @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(
                 id,
                 context,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
index 9b9daf48400..55a8a8cd3d5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -47,6 +49,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.slf4j.Logger;
@@ -61,21 +64,35 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Stream {@link ExecNode} for Python unbounded group aggregate. */
+@ExecNodeMetadata(
+        name = "stream-exec-python-group-aggregate",
+        version = 1,
+        producedTransformations =
+                StreamExecPythonGroupAggregate.PYTHON_GROUP_AGGREGATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_16,
+        minStateVersion = FlinkVersion.v1_16)
 public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
 
     private static final Logger LOG = LoggerFactory.getLogger(StreamExecPythonGroupAggregate.class);
 
+    public static final String PYTHON_GROUP_AGGREGATE_TRANSFORMATION = "python-group-aggregate";
+
     private static final String PYTHON_STREAM_AGGREAGTE_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupAggregateOperator";
 
+    @JsonProperty(FIELD_NAME_GROUPING)
     private final int[] grouping;
 
+    @JsonProperty(FIELD_NAME_AGG_CALLS)
     private final AggregateCall[] aggCalls;
 
+    @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS)
     private final boolean[] aggCallNeedRetractions;
 
+    @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
     private final boolean generateUpdateBefore;
 
+    @JsonProperty(FIELD_NAME_NEED_RETRACTION)
     private final boolean needRetraction;
 
     public StreamExecPythonGroupAggregate(
@@ -105,17 +122,17 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
 
     @JsonCreator
     public StreamExecPythonGroupAggregate(
-            int id,
-            ExecNodeContext context,
-            ReadableConfig persistedConfig,
-            int[] grouping,
-            AggregateCall[] aggCalls,
-            boolean[] aggCallNeedRetractions,
-            boolean generateUpdateBefore,
-            boolean needRetraction,
-            List<InputProperty> inputProperties,
-            RowType outputType,
-            String description) {
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+            @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+            @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+            @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS) boolean[] aggCallNeedRetractions,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         this.grouping = checkNotNull(grouping);
         this.aggCalls = checkNotNull(aggCalls);
@@ -174,8 +191,7 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
         OneInputTransformation<RowData, RowData> transform =
                 ExecNodeUtil.createOneInputTransformation(
                         inputTransform,
-                        createTransformationName(config),
-                        createTransformationDescription(config),
+                        createTransformationMeta(PYTHON_GROUP_AGGREGATE_TRANSFORMATION, config),
                         operator,
                         InternalTypeInfo.of(getOutputType()),
                         inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index 5676dd02997..df725ff3d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -44,6 +45,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -72,6 +74,9 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rel.core.AggregateCall;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,11 +101,24 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Stream {@link ExecNode} for group widow aggregate (Python user defined aggregate function). */
+@ExecNodeMetadata(
+        name = "stream-exec-python-group-window-aggregate",
+        version = 1,
+        producedTransformations =
+                StreamExecPythonGroupWindowAggregate.PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_16,
+        minStateVersion = FlinkVersion.v1_16)
 public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBase {
 
     private static final Logger LOGGER =
             LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class);
 
+    public static final String PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION =
+            "python-group-window-aggregate";
+
+    public static final String FIELD_NAME_WINDOW = "window";
+    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
+
     private static final String ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
                     + "StreamArrowPythonGroupWindowAggregateFunctionOperator";
@@ -117,16 +135,22 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
     private static final String GENERAL_STREAM_PYTHON_CREATE_SESSION_GROUP_WINDOW_METHOD =
             "createSessionGroupWindowAggregateOperator";
 
+    @JsonProperty(FIELD_NAME_GROUPING)
     private final int[] grouping;
 
+    @JsonProperty(FIELD_NAME_AGG_CALLS)
     private final AggregateCall[] aggCalls;
 
+    @JsonProperty(FIELD_NAME_WINDOW)
     private final LogicalWindow window;
 
+    @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
     private final NamedWindowProperty[] namedWindowProperties;
 
+    @JsonProperty(FIELD_NAME_NEED_RETRACTION)
     private final boolean needRetraction;
 
+    @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
     private final boolean generateUpdateBefore;
 
     public StreamExecPythonGroupWindowAggregate(
@@ -156,19 +180,21 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                 description);
     }
 
+    @JsonCreator
     public StreamExecPythonGroupWindowAggregate(
-            int id,
-            ExecNodeContext context,
-            ReadableConfig persistedConfig,
-            int[] grouping,
-            AggregateCall[] aggCalls,
-            LogicalWindow window,
-            NamedWindowProperty[] namedWindowProperties,
-            boolean generateUpdateBefore,
-            boolean needRetraction,
-            List<InputProperty> inputProperties,
-            RowType outputType,
-            String description) {
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+            @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+            @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+            @JsonProperty(FIELD_NAME_WINDOW) LogicalWindow window,
+            @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
+                    NamedWindowProperty[] namedWindowProperties,
+            @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore,
+            @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 1);
         this.grouping = checkNotNull(grouping);
@@ -390,8 +416,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                         shiftTimeZone);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config),
                 pythonOperator,
                 InternalTypeInfo.of(outputRowType),
                 inputTransform.getParallelism());
@@ -431,8 +456,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config),
                 pythonOperator,
                 InternalTypeInfo.of(outputRowType),
                 inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index 626cd38b8ec..fd507b97d4a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -37,6 +38,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
@@ -51,6 +53,9 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rel.core.AggregateCall;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,11 +72,22 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Stream {@link ExecNode} for python time-based over operator. */
+@ExecNodeMetadata(
+        name = "stream-exec-python-over-aggregate",
+        version = 1,
+        producedTransformations =
+                StreamExecPythonOverAggregate.PYTHON_OVER_AGGREGATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_16,
+        minStateVersion = FlinkVersion.v1_16)
 public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
         implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
 
     private static final Logger LOG = LoggerFactory.getLogger(StreamExecPythonOverAggregate.class);
 
+    public static final String PYTHON_OVER_AGGREGATE_TRANSFORMATION = "python-over-aggregate";
+
+    public static final String FIELD_NAME_OVER_SPEC = "overSpec";
+
     private static final String
             ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME =
                     "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
@@ -89,6 +105,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                     "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
                             + "StreamArrowPythonProcTimeBoundedRowsOperator";
 
+    @JsonProperty(FIELD_NAME_OVER_SPEC)
     private final OverSpec overSpec;
 
     public StreamExecPythonOverAggregate(
@@ -108,14 +125,15 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                 description);
     }
 
+    @JsonCreator
     public StreamExecPythonOverAggregate(
-            int id,
-            ExecNodeContext context,
-            ReadableConfig persistedConfig,
-            OverSpec overSpec,
-            List<InputProperty> inputProperties,
-            RowType outputType,
-            String description) {
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+            @JsonProperty(FIELD_NAME_OVER_SPEC) OverSpec overSpec,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 1);
         this.overSpec = checkNotNull(overSpec);
@@ -244,8 +262,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(PYTHON_OVER_AGGREGATE_TRANSFORMATION, config),
                 pythonOperator,
                 InternalTypeInfo.of(outputRowType),
                 inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 2da4fffb1c4..1743d645a7b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -138,6 +138,11 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecWindowJoin.class);
                     add(StreamExecWindowRank.class);
                     add(StreamExecWindowTableFunction.class);
+                    add(StreamExecPythonCalc.class);
+                    add(StreamExecPythonCorrelate.class);
+                    add(StreamExecPythonGroupAggregate.class);
+                    add(StreamExecPythonGroupWindowAggregate.class);
+                    add(StreamExecPythonOverAggregate.class);
                 }
             };
 
@@ -158,11 +163,6 @@ public final class ExecNodeMetadataUtil {
                     add(StreamExecLegacyTableSourceScan.class);
                     add(StreamExecLegacySink.class);
                     add(StreamExecGroupTableAggregate.class);
-                    add(StreamExecPythonCalc.class);
-                    add(StreamExecPythonCorrelate.class);
-                    add(StreamExecPythonGroupAggregate.class);
-                    add(StreamExecPythonGroupWindowAggregate.class);
-                    add(StreamExecPythonOverAggregate.class);
                     add(StreamExecPythonGroupTableAggregate.class);
                     add(StreamExecSort.class);
                     add(StreamExecMultipleInput.class);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
new file mode 100644
index 00000000000..07a367d6d24
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.BooleanPythonScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for calc. */
+public class PythonCalcJsonPlanTest extends TableTestBase {
+
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a bigint,\n"
+                        + "  b int not null,\n"
+                        + "  c varchar,\n"
+                        + "  d timestamp(3)\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+    }
+
+    @Test
+    public void testPythonCalc() {
+        tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from MyTable");
+    }
+
+    @Test
+    public void testPythonFunctionInWhereClause() {
+        tEnv.createTemporaryFunction("pyFunc", new BooleanPythonScalarFunction("pyFunc"));
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan("insert into MySink select a, b from MyTable where pyFunc(b, b + 1)");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
new file mode 100644
index 00000000000..456df0dd0ed
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.MockPythonTableFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for correlate. */
+public class PythonCorrelateJsonPlanTest extends TableTestBase {
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        TableConfig tableConfig = TableConfig.getDefault();
+        util = streamTestUtil(tableConfig);
+        tEnv = util.getTableEnv();
+        tEnv.createTemporaryFunction("TableFunc", new MockPythonTableFunction());
+        tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a int,\n"
+                        + "  b int,\n"
+                        + "  c int,\n"
+                        + "  d timestamp(3)\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+    }
+
+    @Test
+    public void testPythonTableFunction() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a int,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+
+        String sqlQuery =
+                "INSERT INTO MySink SELECT x, y FROM MyTable "
+                        + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE";
+        util.verifyJsonPlan(sqlQuery);
+    }
+
+    @Test
+    public void testJoinWithFilter() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a int,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+
+        String sqlQuery =
+                "INSERT INTO MySink SELECT x, y FROM MyTable, "
+                        + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y";
+        util.verifyJsonPlan(sqlQuery);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..5b50a2597cb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for group aggregate. */
+public class PythonGroupAggregateJsonPlanTest extends TableTestBase {
+
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a int not null,\n"
+                        + "  b int not null,\n"
+                        + "  c int not null,\n"
+                        + "  d bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+        tEnv.createTemporarySystemFunction("pyFunc", new TestPythonAggregateFunction());
+    }
+
+    @Test
+    public void tesPythonAggCallsWithGroupBy() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select b, "
+                        + "pyFunc(a, c) filter (where b > 1) "
+                        + "from MyTable group by b");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..afcd4a86fe1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for group window aggregate. */
+public class PythonGroupWindowAggregateJsonPlanTest extends TableTestBase {
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + " a INT NOT NULL,\n"
+                        + " b BIGINT,\n"
+                        + " c VARCHAR,\n"
+                        + " `rowtime` AS TO_TIMESTAMP(c),\n"
+                        + " proctime as PROCTIME(),\n"
+                        + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(srcTableDdl);
+        tEnv.createTemporarySystemFunction("pyFunc", new TestPythonAggregateFunction());
+    }
+
+    @Test
+    public void testEventTimeTumbleWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " window_start TIMESTAMP(3),\n"
+                        + " window_end TIMESTAMP(3),\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  TUMBLE_START(rowtime, INTERVAL '5' SECOND) as window_start,\n"
+                        + "  TUMBLE_END(rowtime, INTERVAL '5' SECOND) as window_end,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)");
+    }
+
+    @Test
+    public void testProcTimeTumbleWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " window_end TIMESTAMP(3),\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  TUMBLE_END(proctime, INTERVAL '15' MINUTE) as window_end,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, TUMBLE(proctime, INTERVAL '15' MINUTE)");
+    }
+
+    @Test
+    public void testEventTimeHopWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");
+    }
+
+    @Test
+    public void testProcTimeHopWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, HOP(proctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)");
+    }
+
+    @Test
+    public void testEventTimeSessionWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, Session(rowtime, INTERVAL '10' SECOND)");
+    }
+
+    @Test
+    public void testProcTimeSessionWindow() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + " b BIGINT,\n"
+                        + " c BIGINT\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values')\n";
+        tEnv.executeSql(sinkTableDdl);
+        util.verifyJsonPlan(
+                "insert into MySink select\n"
+                        + "  b,\n"
+                        + "  pyFunc(a, a + 1)\n"
+                        + "FROM MyTable\n"
+                        + "GROUP BY b, Session(proctime, INTERVAL '10' MINUTE)");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..ec5f4e3f967
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization for over aggregate. */
+public class PythonOverAggregateJsonPlanTest extends TableTestBase {
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a int,\n"
+                        + "  b varchar,\n"
+                        + "  c int not null,\n"
+                        + "  rowtime timestamp(3),\n"
+                        + "  proctime as PROCTIME(),\n"
+                        + "  watermark for rowtime as rowtime"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+        tEnv.createTemporarySystemFunction("pyFunc", new PandasAggregateFunction());
+    }
+
+    @Test
+    public void testProcTimeBoundedPartitionedRangeOver() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql =
+                "insert into MySink SELECT a,\n"
+                        + "    pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime\n"
+                        + "        RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)\n"
+                        + "FROM MyTable";
+        util.verifyJsonPlan(sql);
+    }
+
+    @Test
+    public void testProcTimeBoundedNonPartitionedRangeOver() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql =
+                "insert into MySink SELECT a,\n"
+                        + "    pyFunc(c, c) OVER (ORDER BY proctime\n"
+                        + "        RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)\n"
+                        + " FROM MyTable";
+        util.verifyJsonPlan(sql);
+    }
+
+    @Test
+    public void testProcTimeUnboundedPartitionedRangeOver() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql =
+                "insert into MySink SELECT a,\n"
+                        + "    pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime RANGE UNBOUNDED PRECEDING)\n"
+                        + "FROM MyTable";
+        util.verifyJsonPlan(sql);
+    }
+
+    @Test
+    public void testRowTimeBoundedPartitionedRowsOver() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql =
+                "insert into MySink SELECT a,\n"
+                        + "    pyFunc(c, c) OVER (PARTITION BY a ORDER BY rowtime\n"
+                        + "        ROWS BETWEEN 5 preceding AND CURRENT ROW)\n"
+                        + "FROM MyTable";
+        util.verifyJsonPlan(sql);
+    }
+
+    @Test
+    public void testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a bigint,\n"
+                        + "  b bigint\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+        String sql =
+                "insert into MySink SELECT a, "
+                        + "  pyFunc(c, c) OVER ("
+                        + "    PARTITION BY a ORDER BY proctime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) "
+                        + "FROM MyTable";
+        util.verifyJsonPlan(sql);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
new file mode 100644
index 00000000000..331f7054703
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
@@ -0,0 +1,133 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-python-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
+    "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
new file mode 100644
index 00000000000..bca1175329b
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
@@ -0,0 +1,216 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      }, {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0` INT NOT NULL>",
+    "description" : "Calc(select=[a, b, (b + 1) AS f0])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-python-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0` BOOLEAN NOT NULL>",
+    "description" : "PythonCalc(select=[a, b, pyFunc(b, f0) AS f0])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BOOLEAN NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Calc(select=[a, b], where=[f0])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
new file mode 100644
index 00000000000..91a1a1ced71
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
@@ -0,0 +1,264 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT>",
+    "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-python-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`TableFunc`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "INT"
+      }, {
+        "kind" : "CALL",
+        "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT"
+        } ],
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "ROW<`x` INT, `y` INT> NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT, `x` INT, `y` INT>",
+    "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,x,y], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER x, INTEGER y)], joinType=[INNER])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$AND$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$+$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : "INT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : "1",
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "INT"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$*$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : "INT"
+          }, {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : "INT"
+          } ],
+          "type" : "INT"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$=$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 5,
+          "type" : "INT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        } ],
+        "type" : "BOOLEAN"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` INT, `y` INT>",
+    "description" : "Calc(select=[x, y], where=[(((y + 1) = (y * y)) AND (x = a))])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` INT, `y` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
new file mode 100644
index 00000000000..137419d1818
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
@@ -0,0 +1,210 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$*$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT>",
+    "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-python-correlate_1",
+    "joinType" : "LEFT",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`TableFunc`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "INT"
+      }, {
+        "kind" : "CALL",
+        "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT"
+        } ],
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "ROW<`x` INT, `y` INT> NOT NULL"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT, `x` INT, `y` INT>",
+    "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,x,y], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER x, INTEGER y)], joinType=[LEFT])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` INT, `y` INT>",
+    "description" : "Calc(select=[x, y])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "table-sink-class" : "DEFAULT"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`x` INT, `y` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
new file mode 100644
index 00000000000..3a983fd2938
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
@@ -0,0 +1,232 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "d",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ],
+        "producedType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL, `$f3` BOOLEAN NOT NULL>",
+    "description" : "Calc(select=[b, a, c, (b > 1) AS $f3])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL, `$f3` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-python-group-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "systemName" : "pyFunc",
+      "argList" : [ 1, 2 ],
+      "filterArg" : 3,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` INT NOT NULL, `EXPR$1` BIGINT>",
+    "description" : "PythonGroupAggregate(groupBy=[b], select=[b, pyFunc(a, c) FILTER $f3 AS EXPR$1])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` BIGINT>",
+    "description" : "Calc(select=[CAST(b AS BIGINT) AS a, EXPR$1 AS b])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
new file mode 100644
index 00000000000..4eb78b8425c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -0,0 +1,390 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+    "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "SLIDING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT10S",
+      "slide" : "PT5S"
+    },
+    "namedWindowProperties" : [ ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
new file mode 100644
index 00000000000..0b404642a70
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
@@ -0,0 +1,388 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+    "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "SESSION",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "gap" : "PT10S"
+    },
+    "namedWindowProperties" : [ ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
new file mode 100644
index 00000000000..5adfce323c1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -0,0 +1,513 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+    "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 1,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$3",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "TUMBLING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "rowtime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT5S"
+    },
+    "namedWindowProperties" : [ {
+      "name" : "w$start",
+      "property" : {
+        "kind" : "WindowStart",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$end",
+      "property" : {
+        "kind" : "WindowEnd",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$rowtime",
+      "property" : {
+        "kind" : "Rowtime",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$proctime",
+      "property" : {
+        "kind" : "Proctime",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        }
+      }
+    } ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "EXPR$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w$start",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$end",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "w$proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT>",
+    "description" : "Calc(select=[b, w$start AS window_start, w$end AS window_end, EXPR$3])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "window_start",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "window_end",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_start, window_end, EXPR$3])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
new file mode 100644
index 00000000000..f0e145c90e1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -0,0 +1,443 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "SLIDING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT10M",
+      "slide" : "PT5M"
+    },
+    "namedWindowProperties" : [ ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
new file mode 100644
index 00000000000..cb5438d8ec5
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
@@ -0,0 +1,441 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$1",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "SESSION",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "gap" : "PT10M"
+    },
+    "namedWindowProperties" : [ ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
new file mode 100644
index 00000000000..08990c9201e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -0,0 +1,542 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`c`)"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 3,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "c",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$+$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1",
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$f3",
+        "fieldType" : "INT NOT NULL"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-group-window-aggregate_1",
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "EXPR$2",
+      "systemName" : "pyFunc",
+      "argList" : [ 2, 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT"
+    } ],
+    "window" : {
+      "kind" : "TUMBLING",
+      "alias" : {
+        "name" : "w$",
+        "type" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "timeField" : {
+        "fieldName" : "proctime",
+        "fieldIndex" : 1,
+        "inputIndex" : 0,
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      },
+      "isTimeWindow" : true,
+      "size" : "PT15M"
+    },
+    "namedWindowProperties" : [ {
+      "name" : "w$start",
+      "property" : {
+        "kind" : "WindowStart",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+            "nullable" : false,
+            "precision" : 3,
+            "kind" : "PROCTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$end",
+      "property" : {
+        "kind" : "WindowEnd",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+            "nullable" : false,
+            "precision" : 3,
+            "kind" : "PROCTIME"
+          }
+        }
+      }
+    }, {
+      "name" : "w$proctime",
+      "property" : {
+        "kind" : "Proctime",
+        "reference" : {
+          "name" : "w$",
+          "type" : {
+            "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+            "nullable" : false,
+            "precision" : 3,
+            "kind" : "PROCTIME"
+          }
+        }
+      }
+    } ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "EXPR$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w$start",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$end",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "w$proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, proctime, 900000)], properties=[w$start, w$end, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$2` BIGINT>",
+    "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "window_end",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$2` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_end, EXPR$2])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
new file mode 100644
index 00000000000..acf9749ab4e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -0,0 +1,448 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 2 ], [ 0 ], [ 3 ] ],
+        "producedType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[c, a, rowtime], metadata=[]]], fields=[c, a, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[c, PROCTIME() AS proctime, a, rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Calc(select=[c, proctime, CAST(a AS BIGINT) AS $2])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[single])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 1,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : false,
+        "lowerBound" : {
+          "kind" : "BOUNDED_WINDOW",
+          "isPreceding" : true,
+          "offset" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 3,
+            "type" : "INTERVAL SECOND(6) NOT NULL"
+          }
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "systemName" : "pyFunc",
+          "argList" : [ 0, 0 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "BIGINT"
+        } ]
+      } ],
+      "constants" : [ {
+        "kind" : "LITERAL",
+        "value" : "10000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "originalInputFields" : 3
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) AS w0$o0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
new file mode 100644
index 00000000000..28c139943f9
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -0,0 +1,462 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ 0 ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 2,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : false,
+        "lowerBound" : {
+          "kind" : "BOUNDED_WINDOW",
+          "isPreceding" : true,
+          "offset" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 4,
+            "type" : "INTERVAL HOUR NOT NULL"
+          }
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "systemName" : "pyFunc",
+          "argList" : [ 1, 1 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "BIGINT"
+        } ]
+      } ],
+      "constants" : [ {
+        "kind" : "LITERAL",
+        "value" : "7200000",
+        "type" : "INTERVAL HOUR NOT NULL"
+      } ],
+      "originalInputFields" : 4
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
new file mode 100644
index 00000000000..2a37b63d4df
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
@@ -0,0 +1,392 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$3",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Calc(select=[a, c, CAST(a AS BIGINT) AS $2, PROCTIME() AS $3])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$3",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-python-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ 0 ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 3,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : true,
+        "lowerBound" : {
+          "kind" : "BOUNDED_WINDOW",
+          "isPreceding" : true,
+          "offset" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 4,
+            "type" : "INT NOT NULL"
+          }
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "systemName" : "pyFunc",
+          "argList" : [ 1, 1 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "BIGINT"
+        } ]
+      } ],
+      "constants" : [ {
+        "kind" : "LITERAL",
+        "value" : "4",
+        "type" : "INT NOT NULL"
+      } ],
+      "originalInputFields" : 4
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "$2",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "$3",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[$3 ASC], window=[ ROWS BETWEEN 4 PRECEDING AND CURRENT ROW], select=[a, c, $2, $3, pyFunc(c, c) AS w0$o0])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
new file mode 100644
index 00000000000..0b95247d2c4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -0,0 +1,452 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$PROCTIME$1",
+      "operands" : [ ],
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : "TIMESTAMP(3)"
+      } ]
+    },
+    "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+        "nullable" : false,
+        "precision" : 3,
+        "kind" : "PROCTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-python-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ 0 ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 2,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : false,
+        "lowerBound" : {
+          "kind" : "UNBOUNDED_PRECEDING"
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "systemName" : "pyFunc",
+          "argList" : [ 1, 1 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "BIGINT"
+        } ]
+      } ],
+      "constants" : [ ],
+      "originalInputFields" : 4
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "proctime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "PROCTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
new file mode 100644
index 00000000000..d7e652d3bb1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
@@ -0,0 +1,387 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "rowtime",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            }, {
+              "name" : "proctime",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$PROCTIME$1",
+                  "operands" : [ ],
+                  "type" : {
+                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                    "nullable" : false,
+                    "precision" : 3,
+                    "kind" : "PROCTIME"
+                  }
+                },
+                "serializableString" : "PROCTIME()"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "rowtime",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "INPUT_REF",
+                  "inputIndex" : 3,
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`rowtime`"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "bounded" : "false"
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      }
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-python-over-aggregate_1",
+    "overSpec" : {
+      "partition" : {
+        "fields" : [ 0 ]
+      },
+      "groups" : [ {
+        "orderBy" : {
+          "fields" : [ {
+            "index" : 2,
+            "isAscending" : true,
+            "nullIsLast" : false
+          } ]
+        },
+        "isRows" : true,
+        "lowerBound" : {
+          "kind" : "BOUNDED_WINDOW",
+          "isPreceding" : true,
+          "offset" : {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 4,
+            "type" : "INT NOT NULL"
+          }
+        },
+        "upperBound" : {
+          "kind" : "CURRENT_ROW"
+        },
+        "aggCalls" : [ {
+          "name" : "w0$o0",
+          "systemName" : "pyFunc",
+          "argList" : [ 1, 1 ],
+          "filterArg" : -1,
+          "distinct" : false,
+          "approximate" : false,
+          "ignoreNulls" : false,
+          "type" : "BIGINT"
+        } ]
+      } ],
+      "constants" : [ {
+        "kind" : "LITERAL",
+        "value" : "5",
+        "type" : "INT NOT NULL"
+      } ],
+      "originalInputFields" : 4
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "a",
+        "fieldType" : "INT"
+      }, {
+        "name" : "c",
+        "fieldType" : "INT NOT NULL"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "$3",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "w0$o0",
+        "fieldType" : "BIGINT"
+      } ]
+    },
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, pyFunc(c, c) AS w0$o0])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "table-sink-class" : "DEFAULT",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file