You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/08/01 11:08:59 UTC
[flink] branch master updated: [FLINK-28752][python][table-planner] Add the json plan support in Python UDFs
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6ff6978c0eb [FLINK-28752][python][table-planner] Add the json plan support in Python UDFs
6ff6978c0eb is described below
commit 6ff6978c0eb56a9cd036a6bafdd049d0d8de9ba8
Author: huangxingbo <hx...@apache.org>
AuthorDate: Sat Jul 30 19:07:40 2022 +0800
[FLINK-28752][python][table-planner] Add the json plan support in Python UDFs
This closes #20398.
---
.../pyflink/table/tests/test_pandas_udaf.py | 3 +-
flink-python/pyflink/table/tests/test_udaf.py | 7 +-
flink-python/pyflink/table/tests/test_udf.py | 3 +-
flink-python/pyflink/table/tests/test_udtf.py | 3 +-
.../nodes/exec/common/CommonExecPythonCalc.java | 10 +-
.../exec/common/CommonExecPythonCorrelate.java | 12 +-
.../nodes/exec/stream/StreamExecPythonCalc.java | 26 +-
.../exec/stream/StreamExecPythonCorrelate.java | 28 +-
.../stream/StreamExecPythonGroupAggregate.java | 42 +-
.../StreamExecPythonGroupWindowAggregate.java | 56 ++-
.../exec/stream/StreamExecPythonOverAggregate.java | 35 +-
.../planner/plan/utils/ExecNodeMetadataUtil.java | 10 +-
.../nodes/exec/stream/PythonCalcJsonPlanTest.java | 81 +++
.../exec/stream/PythonCorrelateJsonPlanTest.java | 89 ++++
.../stream/PythonGroupAggregateJsonPlanTest.java | 70 +++
.../PythonGroupWindowAggregateJsonPlanTest.java | 161 ++++++
.../stream/PythonOverAggregateJsonPlanTest.java | 147 ++++++
.../testPythonCalc.out | 133 +++++
.../testPythonFunctionInWhereClause.out | 216 ++++++++
.../testJoinWithFilter.out | 264 ++++++++++
.../testPythonTableFunction.out | 210 ++++++++
.../tesPythonAggCallsWithGroupBy.out | 232 +++++++++
.../testEventTimeHopWindow.out | 390 +++++++++++++++
.../testEventTimeSessionWindow.out | 388 +++++++++++++++
.../testEventTimeTumbleWindow.out | 513 +++++++++++++++++++
.../testProcTimeHopWindow.out | 443 +++++++++++++++++
.../testProcTimeSessionWindow.out | 441 +++++++++++++++++
.../testProcTimeTumbleWindow.out | 542 +++++++++++++++++++++
.../testProcTimeBoundedNonPartitionedRangeOver.out | 448 +++++++++++++++++
.../testProcTimeBoundedPartitionedRangeOver.out | 462 ++++++++++++++++++
...undedPartitionedRowsOverWithBuiltinProctime.out | 392 +++++++++++++++
.../testProcTimeUnboundedPartitionedRangeOver.out | 452 +++++++++++++++++
.../testRowTimeBoundedPartitionedRowsOver.out | 387 +++++++++++++++
33 files changed, 6623 insertions(+), 73 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_pandas_udaf.py b/flink-python/pyflink/table/tests/test_pandas_udaf.py
index 71cd6380f41..b0b0ec6ebc9 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udaf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udaf.py
@@ -742,7 +742,6 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
"+I[3, 2.0, 4]"])
os.remove(source_path)
- @unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_over_aggregate_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
@@ -806,7 +805,7 @@ class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
from source_table
""")
from py4j.java_gateway import get_method
- get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+ get_method(json_plan.execute(), "await")()
import glob
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udaf.py b/flink-python/pyflink/table/tests/test_udaf.py
index e6f96527ff7..56c59da0dd3 100644
--- a/flink-python/pyflink/table/tests/test_udaf.py
+++ b/flink-python/pyflink/table/tests/test_udaf.py
@@ -17,7 +17,6 @@
################################################################################
import collections
import datetime
-import unittest
from decimal import Decimal
import pandas as pd
@@ -808,7 +807,6 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
"+I[1, 2018-03-11T03:10, 2018-03-11T04:10, 2]",
"+I[1, 2018-03-11T04:20, 2018-03-11T04:50, 1]"])
- @unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_group_aggregate_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
@@ -845,9 +843,8 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
"SELECT a, my_sum(b) FROM source_table "
"GROUP BY a")
from py4j.java_gateway import get_method
- get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+ get_method(json_plan.execute(), "await")()
- @unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_group_window_aggregate_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
@@ -906,7 +903,7 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
"a, b, "
"SESSION(rowtime, INTERVAL '30' MINUTE)")
from py4j.java_gateway import get_method
- get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+ get_method(json_plan.execute(), "await")()
import glob
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 34801d7b76a..8c5a1fbff02 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -741,7 +741,6 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1970-01-01T00:00:00.123Z]"])
- @unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
@@ -783,7 +782,7 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
"add_one(b) "
"FROM source_table")
from py4j.java_gateway import get_method
- get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+ get_method(json_plan.execute(), "await")()
import glob
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 5a730ffa5d8..f91c7ad0b50 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -75,7 +75,6 @@ class UserDefinedTableFunctionTests(object):
class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
PyFlinkStreamTableTestCase):
- @unittest.skip("Python UDFs are currently unsupported in JSON plan")
def test_execute_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
@@ -119,7 +118,7 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
" as T(x, y)"
" ON TRUE")
from py4j.java_gateway import get_method
- get_method(self.t_env._j_tenv.executePlan(json_plan), "await")()
+ get_method(json_plan.execute(), "await")()
import glob
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index 19eb4f38a5a..e102de9d063 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -48,6 +48,8 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
@@ -66,6 +68,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
+ public static final String PYTHON_CALC_TRANSFORMATION = "python-calc";
+
+ public static final String FIELD_NAME_PROJECTION = "projection";
+
private static final String PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.scalar."
+ "PythonScalarFunctionOperator";
@@ -78,6 +84,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
"org.apache.flink.table.runtime.operators.python.scalar.arrow."
+ "ArrowPythonScalarFunctionOperator";
+ @JsonProperty(FIELD_NAME_PROJECTION)
private final List<RexNode> projection;
public CommonExecPythonCalc(
@@ -171,8 +178,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_CALC_TRANSFORMATION, config),
pythonOperator,
pythonOperatorResultTyeInfo,
inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 9c12d15329e..0de1483db13 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -46,6 +46,8 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
@@ -60,14 +62,21 @@ import static org.apache.flink.util.Preconditions.checkArgument;
public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
+ public static final String PYTHON_CORRELATE_TRANSFORMATION = "python-correlate";
+
+ public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+ public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+
private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator";
private static final String EMBEDDED_PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.table.EmbeddedPythonTableFunctionOperator";
+ @JsonProperty(FIELD_NAME_JOIN_TYPE)
private final FlinkJoinType joinType;
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL)
private final RexCall invocation;
public CommonExecPythonCorrelate(
@@ -129,8 +138,7 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
pythonUdtfInputOffsets);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_CORRELATE_TRANSFORMATION, pythonConfig),
pythonOperator,
pythonOperatorOutputRowType,
inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
index 496ca96c1eb..39e42fef14d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCalc.java
@@ -18,20 +18,31 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCalc;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexNode;
import java.util.Collections;
import java.util.List;
/** Stream {@link ExecNode} for Python ScalarFunctions. */
+@ExecNodeMetadata(
+ name = "stream-exec-python-calc",
+ version = 1,
+ producedTransformations = CommonExecPythonCalc.PYTHON_CALC_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_16,
+ minStateVersion = FlinkVersion.v1_16)
public class StreamExecPythonCalc extends CommonExecPythonCalc implements StreamExecNode<RowData> {
public StreamExecPythonCalc(
@@ -50,14 +61,15 @@ public class StreamExecPythonCalc extends CommonExecPythonCalc implements Stream
description);
}
+ @JsonCreator
public StreamExecPythonCalc(
- int id,
- ExecNodeContext context,
- ReadableConfig persistedConfig,
- List<RexNode> projection,
- List<InputProperty> inputProperties,
- RowType outputType,
- String description) {
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, projection, inputProperties, outputType, description);
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
index 0ce687a804c..bf215843ea5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
@@ -18,14 +18,19 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCorrelate;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
@@ -33,6 +38,12 @@ import java.util.Collections;
import java.util.List;
/** Stream exec node which matches along with join a Python user defined table function. */
+@ExecNodeMetadata(
+ name = "stream-exec-python-correlate",
+ version = 1,
+ producedTransformations = CommonExecPythonCorrelate.PYTHON_CORRELATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_16,
+ minStateVersion = FlinkVersion.v1_16)
public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
implements StreamExecNode<RowData> {
@@ -54,15 +65,16 @@ public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
description);
}
+ @JsonCreator
public StreamExecPythonCorrelate(
- int id,
- ExecNodeContext context,
- ReadableConfig persistedConfig,
- FlinkJoinType joinType,
- RexNode invocation,
- List<InputProperty> inputProperties,
- RowType outputType,
- String description) {
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(
id,
context,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
index 9b9daf48400..55a8a8cd3d5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -47,6 +49,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.rel.core.AggregateCall;
import org.slf4j.Logger;
@@ -61,21 +64,35 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Stream {@link ExecNode} for Python unbounded group aggregate. */
+@ExecNodeMetadata(
+ name = "stream-exec-python-group-aggregate",
+ version = 1,
+ producedTransformations =
+ StreamExecPythonGroupAggregate.PYTHON_GROUP_AGGREGATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_16,
+ minStateVersion = FlinkVersion.v1_16)
public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamExecPythonGroupAggregate.class);
+ public static final String PYTHON_GROUP_AGGREGATE_TRANSFORMATION = "python-group-aggregate";
+
private static final String PYTHON_STREAM_AGGREAGTE_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupAggregateOperator";
+ @JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
+ @JsonProperty(FIELD_NAME_AGG_CALLS)
private final AggregateCall[] aggCalls;
+ @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS)
private final boolean[] aggCallNeedRetractions;
+ @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
private final boolean generateUpdateBefore;
+ @JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;
public StreamExecPythonGroupAggregate(
@@ -105,17 +122,17 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
@JsonCreator
public StreamExecPythonGroupAggregate(
- int id,
- ExecNodeContext context,
- ReadableConfig persistedConfig,
- int[] grouping,
- AggregateCall[] aggCalls,
- boolean[] aggCallNeedRetractions,
- boolean generateUpdateBefore,
- boolean needRetraction,
- List<InputProperty> inputProperties,
- RowType outputType,
- String description) {
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+ @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+ @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS) boolean[] aggCallNeedRetractions,
+ @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore,
+ @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
this.grouping = checkNotNull(grouping);
this.aggCalls = checkNotNull(aggCalls);
@@ -174,8 +191,7 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_GROUP_AGGREGATE_TRANSFORMATION, config),
operator,
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index 5676dd02997..df725ff3d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -44,6 +45,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -72,6 +74,9 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rel.core.AggregateCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,11 +101,24 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Stream {@link ExecNode} for group widow aggregate (Python user defined aggregate function). */
+@ExecNodeMetadata(
+ name = "stream-exec-python-group-window-aggregate",
+ version = 1,
+ producedTransformations =
+ StreamExecPythonGroupWindowAggregate.PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_16,
+ minStateVersion = FlinkVersion.v1_16)
public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBase {
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class);
+ public static final String PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION =
+ "python-group-window-aggregate";
+
+ public static final String FIELD_NAME_WINDOW = "window";
+ public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
+
private static final String ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
+ "StreamArrowPythonGroupWindowAggregateFunctionOperator";
@@ -117,16 +135,22 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
private static final String GENERAL_STREAM_PYTHON_CREATE_SESSION_GROUP_WINDOW_METHOD =
"createSessionGroupWindowAggregateOperator";
+ @JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
+ @JsonProperty(FIELD_NAME_AGG_CALLS)
private final AggregateCall[] aggCalls;
+ @JsonProperty(FIELD_NAME_WINDOW)
private final LogicalWindow window;
+ @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
private final NamedWindowProperty[] namedWindowProperties;
+ @JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;
+ @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
private final boolean generateUpdateBefore;
public StreamExecPythonGroupWindowAggregate(
@@ -156,19 +180,21 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
description);
}
+ @JsonCreator
public StreamExecPythonGroupWindowAggregate(
- int id,
- ExecNodeContext context,
- ReadableConfig persistedConfig,
- int[] grouping,
- AggregateCall[] aggCalls,
- LogicalWindow window,
- NamedWindowProperty[] namedWindowProperties,
- boolean generateUpdateBefore,
- boolean needRetraction,
- List<InputProperty> inputProperties,
- RowType outputType,
- String description) {
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+ @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+ @JsonProperty(FIELD_NAME_WINDOW) LogicalWindow window,
+ @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
+ NamedWindowProperty[] namedWindowProperties,
+ @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean generateUpdateBefore,
+ @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.grouping = checkNotNull(grouping);
@@ -390,8 +416,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
shiftTimeZone);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config),
pythonOperator,
InternalTypeInfo.of(outputRowType),
inputTransform.getParallelism());
@@ -431,8 +456,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config),
pythonOperator,
InternalTypeInfo.of(outputRowType),
inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index 626cd38b8ec..fd507b97d4a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -37,6 +38,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
@@ -51,6 +53,9 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rel.core.AggregateCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,11 +72,22 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Stream {@link ExecNode} for python time-based over operator. */
+@ExecNodeMetadata(
+ name = "stream-exec-python-over-aggregate",
+ version = 1,
+ producedTransformations =
+ StreamExecPythonOverAggregate.PYTHON_OVER_AGGREGATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_16,
+ minStateVersion = FlinkVersion.v1_16)
public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(StreamExecPythonOverAggregate.class);
+ public static final String PYTHON_OVER_AGGREGATE_TRANSFORMATION = "python-over-aggregate";
+
+ public static final String FIELD_NAME_OVER_SPEC = "overSpec";
+
private static final String
ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
@@ -89,6 +105,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
"org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream."
+ "StreamArrowPythonProcTimeBoundedRowsOperator";
+ @JsonProperty(FIELD_NAME_OVER_SPEC)
private final OverSpec overSpec;
public StreamExecPythonOverAggregate(
@@ -108,14 +125,15 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
description);
}
+ @JsonCreator
public StreamExecPythonOverAggregate(
- int id,
- ExecNodeContext context,
- ReadableConfig persistedConfig,
- OverSpec overSpec,
- List<InputProperty> inputProperties,
- RowType outputType,
- String description) {
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_OVER_SPEC) OverSpec overSpec,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.overSpec = checkNotNull(overSpec);
@@ -244,8 +262,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(PYTHON_OVER_AGGREGATE_TRANSFORMATION, config),
pythonOperator,
InternalTypeInfo.of(outputRowType),
inputTransform.getParallelism());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 2da4fffb1c4..1743d645a7b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -138,6 +138,11 @@ public final class ExecNodeMetadataUtil {
add(StreamExecWindowJoin.class);
add(StreamExecWindowRank.class);
add(StreamExecWindowTableFunction.class);
+ add(StreamExecPythonCalc.class);
+ add(StreamExecPythonCorrelate.class);
+ add(StreamExecPythonGroupAggregate.class);
+ add(StreamExecPythonGroupWindowAggregate.class);
+ add(StreamExecPythonOverAggregate.class);
}
};
@@ -158,11 +163,6 @@ public final class ExecNodeMetadataUtil {
add(StreamExecLegacyTableSourceScan.class);
add(StreamExecLegacySink.class);
add(StreamExecGroupTableAggregate.class);
- add(StreamExecPythonCalc.class);
- add(StreamExecPythonCorrelate.class);
- add(StreamExecPythonGroupAggregate.class);
- add(StreamExecPythonGroupWindowAggregate.class);
- add(StreamExecPythonOverAggregate.class);
add(StreamExecPythonGroupTableAggregate.class);
add(StreamExecSort.class);
add(StreamExecMultipleInput.class);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
new file mode 100644
index 00000000000..07a367d6d24
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.BooleanPythonScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for calc. */
+public class PythonCalcJsonPlanTest extends TableTestBase {
+
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a bigint,\n"
+ + " b int not null,\n"
+ + " c varchar,\n"
+ + " d timestamp(3)\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ }
+
+ @Test
+ public void testPythonCalc() {
+ tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan("insert into MySink select a, pyFunc(b, b) from MyTable");
+ }
+
+ @Test
+ public void testPythonFunctionInWhereClause() {
+ tEnv.createTemporaryFunction("pyFunc", new BooleanPythonScalarFunction("pyFunc"));
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan("insert into MySink select a, b from MyTable where pyFunc(b, b + 1)");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
new file mode 100644
index 00000000000..456df0dd0ed
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.MockPythonTableFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for correlate. */
+public class PythonCorrelateJsonPlanTest extends TableTestBase {
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ TableConfig tableConfig = TableConfig.getDefault();
+ util = streamTestUtil(tableConfig);
+ tEnv = util.getTableEnv();
+ tEnv.createTemporaryFunction("TableFunc", new MockPythonTableFunction());
+ tEnv.createTemporaryFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a int,\n"
+ + " b int,\n"
+ + " c int,\n"
+ + " d timestamp(3)\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ }
+
+ @Test
+ public void testPythonTableFunction() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a int,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+
+ String sqlQuery =
+ "INSERT INTO MySink SELECT x, y FROM MyTable "
+ + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE";
+ util.verifyJsonPlan(sqlQuery);
+ }
+
+ @Test
+ public void testJoinWithFilter() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a int,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+
+ String sqlQuery =
+ "INSERT INTO MySink SELECT x, y FROM MyTable, "
+ + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y";
+ util.verifyJsonPlan(sqlQuery);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..5b50a2597cb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for group aggregate. */
+public class PythonGroupAggregateJsonPlanTest extends TableTestBase {
+
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a int not null,\n"
+ + " b int not null,\n"
+ + " c int not null,\n"
+ + " d bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ tEnv.createTemporarySystemFunction("pyFunc", new TestPythonAggregateFunction());
+ }
+
+ @Test
+ public void tesPythonAggCallsWithGroupBy() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select b, "
+ + "pyFunc(a, c) filter (where b > 1) "
+ + "from MyTable group by b");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..afcd4a86fe1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.TestPythonAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for group window aggregate. */
+public class PythonGroupWindowAggregateJsonPlanTest extends TableTestBase {
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a INT NOT NULL,\n"
+ + " b BIGINT,\n"
+ + " c VARCHAR,\n"
+ + " `rowtime` AS TO_TIMESTAMP(c),\n"
+ + " proctime as PROCTIME(),\n"
+ + " WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(srcTableDdl);
+ tEnv.createTemporarySystemFunction("pyFunc", new TestPythonAggregateFunction());
+ }
+
+ @Test
+ public void testEventTimeTumbleWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " window_start TIMESTAMP(3),\n"
+ + " window_end TIMESTAMP(3),\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " TUMBLE_START(rowtime, INTERVAL '5' SECOND) as window_start,\n"
+ + " TUMBLE_END(rowtime, INTERVAL '5' SECOND) as window_end,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)");
+ }
+
+ @Test
+ public void testProcTimeTumbleWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " window_end TIMESTAMP(3),\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " TUMBLE_END(proctime, INTERVAL '15' MINUTE) as window_end,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, TUMBLE(proctime, INTERVAL '15' MINUTE)");
+ }
+
+ @Test
+ public void testEventTimeHopWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");
+ }
+
+ @Test
+ public void testProcTimeHopWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, HOP(proctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)");
+ }
+
+ @Test
+ public void testEventTimeSessionWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, Session(rowtime, INTERVAL '10' SECOND)");
+ }
+
+ @Test
+ public void testProcTimeSessionWindow() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " b BIGINT,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values')\n";
+ tEnv.executeSql(sinkTableDdl);
+ util.verifyJsonPlan(
+ "insert into MySink select\n"
+ + " b,\n"
+ + " pyFunc(a, a + 1)\n"
+ + "FROM MyTable\n"
+ + "GROUP BY b, Session(proctime, INTERVAL '10' MINUTE)");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java
new file mode 100644
index 00000000000..ec5f4e3f967
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.PandasAggregateFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization for over aggregate. */
+public class PythonOverAggregateJsonPlanTest extends TableTestBase {
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a int,\n"
+ + " b varchar,\n"
+ + " c int not null,\n"
+ + " rowtime timestamp(3),\n"
+ + " proctime as PROCTIME(),\n"
+ + " watermark for rowtime as rowtime"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ tEnv.createTemporarySystemFunction("pyFunc", new PandasAggregateFunction());
+ }
+
+ @Test
+ public void testProcTimeBoundedPartitionedRangeOver() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql =
+ "insert into MySink SELECT a,\n"
+ + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime\n"
+ + " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)\n"
+ + "FROM MyTable";
+ util.verifyJsonPlan(sql);
+ }
+
+ @Test
+ public void testProcTimeBoundedNonPartitionedRangeOver() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql =
+ "insert into MySink SELECT a,\n"
+ + " pyFunc(c, c) OVER (ORDER BY proctime\n"
+ + " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)\n"
+ + " FROM MyTable";
+ util.verifyJsonPlan(sql);
+ }
+
+ @Test
+ public void testProcTimeUnboundedPartitionedRangeOver() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql =
+ "insert into MySink SELECT a,\n"
+ + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime RANGE UNBOUNDED PRECEDING)\n"
+ + "FROM MyTable";
+ util.verifyJsonPlan(sql);
+ }
+
+ @Test
+ public void testRowTimeBoundedPartitionedRowsOver() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql =
+ "insert into MySink SELECT a,\n"
+ + " pyFunc(c, c) OVER (PARTITION BY a ORDER BY rowtime\n"
+ + " ROWS BETWEEN 5 preceding AND CURRENT ROW)\n"
+ + "FROM MyTable";
+ util.verifyJsonPlan(sql);
+ }
+
+ @Test
+ public void testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql =
+ "insert into MySink SELECT a, "
+ + " pyFunc(c, c) OVER ("
+ + " PARTITION BY a ORDER BY proctime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) "
+ + "FROM MyTable";
+ util.verifyJsonPlan(sql);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
new file mode 100644
index 00000000000..331f7054703
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
@@ -0,0 +1,133 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 1 ] ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-python-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
+ "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
new file mode 100644
index 00000000000..bca1175329b
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
@@ -0,0 +1,216 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ }, {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 1 ] ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, b], metadata=[]]], fields=[a, b])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0` INT NOT NULL>",
+ "description" : "Calc(select=[a, b, (b + 1) AS f0])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-python-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0` BOOLEAN NOT NULL>",
+ "description" : "PythonCalc(select=[a, b, pyFunc(b, f0) AS f0])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BOOLEAN NOT NULL"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "Calc(select=[a, b], where=[f0])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
new file mode 100644
index 00000000000..91a1a1ced71
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
@@ -0,0 +1,264 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "INT"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT>",
+ "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-python-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`TableFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "ROW<`x` INT, `y` INT> NOT NULL"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT, `x` INT, `y` INT>",
+ "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,x,y], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER x, INTEGER y)], joinType=[INNER])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ } ],
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` INT, `y` INT>",
+ "description" : "Calc(select=[x, y], where=[(((y + 1) = (y * y)) AND (x = a))])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` INT, `y` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
new file mode 100644
index 00000000000..137419d1818
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
@@ -0,0 +1,210 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "INT"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT>",
+ "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-python-correlate_1",
+ "joinType" : "LEFT",
+ "functionCall" : {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`TableFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "ROW<`x` INT, `y` INT> NOT NULL"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` INT, `c` INT, `d` TIMESTAMP(3), `f0` INT, `x` INT, `y` INT>",
+ "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,x,y], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER x, INTEGER y)], joinType=[LEFT])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` INT, `y` INT>",
+ "description" : "Calc(select=[x, y])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "table-sink-class" : "DEFAULT"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`x` INT, `y` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
new file mode 100644
index 00000000000..3a983fd2938
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
@@ -0,0 +1,232 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "d",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ],
+ "producedType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL, `$f3` BOOLEAN NOT NULL>",
+ "description" : "Calc(select=[b, a, c, (b > 1) AS $f3])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` INT NOT NULL, `a` INT NOT NULL, `c` INT NOT NULL, `$f3` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-python-group-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$1",
+ "systemName" : "pyFunc",
+ "argList" : [ 1, 2 ],
+ "filterArg" : 3,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "aggCallNeedRetractions" : [ false ],
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` INT NOT NULL, `EXPR$1` BIGINT>",
+ "description" : "PythonGroupAggregate(groupBy=[b], select=[b, pyFunc(a, c) FILTER $f3 AS EXPR$1])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` BIGINT>",
+ "description" : "Calc(select=[CAST(b AS BIGINT) AS a, EXPR$1 AS b])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
new file mode 100644
index 00000000000..4eb78b8425c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -0,0 +1,390 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+ "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 1,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$1",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "SLIDING",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "rowtime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "isTimeWindow" : true,
+ "size" : "PT10S",
+ "slide" : "PT5S"
+ },
+ "namedWindowProperties" : [ ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
new file mode 100644
index 00000000000..0b404642a70
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
@@ -0,0 +1,388 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+ "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 1,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$1",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "SESSION",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "rowtime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "gap" : "PT10S"
+ },
+ "namedWindowProperties" : [ ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
new file mode 100644
index 00000000000..5adfce323c1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -0,0 +1,513 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `rowtime` TIMESTAMP(3), `a` INT NOT NULL>",
+ "description" : "Calc(select=[b, TO_TIMESTAMP(c) AS rowtime, a])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 1,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, rowtime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$3",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "TUMBLING",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "rowtime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ },
+ "isTimeWindow" : true,
+ "size" : "PT5S"
+ },
+ "namedWindowProperties" : [ {
+ "name" : "w$start",
+ "property" : {
+ "kind" : "WindowStart",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "w$end",
+ "property" : {
+ "kind" : "WindowEnd",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "w$rowtime",
+ "property" : {
+ "kind" : "Rowtime",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "w$proctime",
+ "property" : {
+ "kind" : "Proctime",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }
+ }
+ } ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "EXPR$3",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w$start",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "w$end",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "w$rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "w$proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$3, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT>",
+ "description" : "Calc(select=[b, w$start AS window_start, w$end AS window_end, EXPR$3])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "window_start",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "window_end",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_start, window_end, EXPR$3])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
new file mode 100644
index 00000000000..f0e145c90e1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -0,0 +1,443 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$1",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "SLIDING",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "proctime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "isTimeWindow" : true,
+ "size" : "PT10M",
+ "slide" : "PT5M"
+ },
+ "namedWindowProperties" : [ ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
new file mode 100644
index 00000000000..cb5438d8ec5
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
@@ -0,0 +1,441 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$1",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "SESSION",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "proctime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "gap" : "PT10M"
+ },
+ "namedWindowProperties" : [ ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) AS EXPR$1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
new file mode 100644
index 00000000000..08990c9201e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -0,0 +1,542 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`c`)"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Calc(select=[b, proctime, a, (a + 1) AS $f3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$f3",
+ "fieldType" : "INT NOT NULL"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-group-window-aggregate_1",
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "EXPR$2",
+ "systemName" : "pyFunc",
+ "argList" : [ 2, 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ],
+ "window" : {
+ "kind" : "TUMBLING",
+ "alias" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "timeField" : {
+ "fieldName" : "proctime",
+ "fieldIndex" : 1,
+ "inputIndex" : 0,
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "isTimeWindow" : true,
+ "size" : "PT15M"
+ },
+ "namedWindowProperties" : [ {
+ "name" : "w$start",
+ "property" : {
+ "kind" : "WindowStart",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "w$end",
+ "property" : {
+ "kind" : "WindowEnd",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }
+ }
+ }, {
+ "name" : "w$proctime",
+ "property" : {
+ "kind" : "Proctime",
+ "reference" : {
+ "name" : "w$",
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }
+ }
+ } ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "EXPR$2",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w$start",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "w$end",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "w$proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, proctime, 900000)], properties=[w$start, w$end, w$proctime], select=[b, pyFunc(a, $f3) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$2` BIGINT>",
+ "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "window_end",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, `EXPR$2` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, window_end, EXPR$2])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
new file mode 100644
index 00000000000..acf9749ab4e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -0,0 +1,448 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 2 ], [ 0 ], [ 3 ] ],
+ "producedType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`c` INT NOT NULL, `a` INT, `rowtime` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[c, a, rowtime], metadata=[]]], fields=[c, a, rowtime])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ } ]
+ },
+ "description" : "Calc(select=[c, PROCTIME() AS proctime, a, rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Calc(select=[c, proctime, CAST(a AS BIGINT) AS $2])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-over-aggregate_1",
+ "overSpec" : {
+ "partition" : {
+ "fields" : [ ]
+ },
+ "groups" : [ {
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 1,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "isRows" : false,
+ "lowerBound" : {
+ "kind" : "BOUNDED_WINDOW",
+ "isPreceding" : true,
+ "offset" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ }
+ },
+ "upperBound" : {
+ "kind" : "CURRENT_ROW"
+ },
+ "aggCalls" : [ {
+ "name" : "w0$o0",
+ "systemName" : "pyFunc",
+ "argList" : [ 0, 0 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ]
+ } ],
+ "constants" : [ {
+ "kind" : "LITERAL",
+ "value" : "10000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "originalInputFields" : 3
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w0$o0",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) AS w0$o0])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
new file mode 100644
index 00000000000..28c139943f9
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -0,0 +1,462 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ } ]
+ },
+ "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-over-aggregate_1",
+ "overSpec" : {
+ "partition" : {
+ "fields" : [ 0 ]
+ },
+ "groups" : [ {
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 2,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "isRows" : false,
+ "lowerBound" : {
+ "kind" : "BOUNDED_WINDOW",
+ "isPreceding" : true,
+ "offset" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "INTERVAL HOUR NOT NULL"
+ }
+ },
+ "upperBound" : {
+ "kind" : "CURRENT_ROW"
+ },
+ "aggCalls" : [ {
+ "name" : "w0$o0",
+ "systemName" : "pyFunc",
+ "argList" : [ 1, 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ]
+ } ],
+ "constants" : [ {
+ "kind" : "LITERAL",
+ "value" : "7200000",
+ "type" : "INTERVAL HOUR NOT NULL"
+ } ],
+ "originalInputFields" : 4
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w0$o0",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
new file mode 100644
index 00000000000..2a37b63d4df
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
@@ -0,0 +1,392 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "$3",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Calc(select=[a, c, CAST(a AS BIGINT) AS $2, PROCTIME() AS $3])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "$3",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-python-over-aggregate_1",
+ "overSpec" : {
+ "partition" : {
+ "fields" : [ 0 ]
+ },
+ "groups" : [ {
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 3,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "isRows" : true,
+ "lowerBound" : {
+ "kind" : "BOUNDED_WINDOW",
+ "isPreceding" : true,
+ "offset" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "INT NOT NULL"
+ }
+ },
+ "upperBound" : {
+ "kind" : "CURRENT_ROW"
+ },
+ "aggCalls" : [ {
+ "name" : "w0$o0",
+ "systemName" : "pyFunc",
+ "argList" : [ 1, 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ]
+ } ],
+ "constants" : [ {
+ "kind" : "LITERAL",
+ "value" : "4",
+ "type" : "INT NOT NULL"
+ } ],
+ "originalInputFields" : 4
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "$2",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "$3",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "w0$o0",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[$3 ASC], window=[ ROWS BETWEEN 4 PRECEDING AND CURRENT ROW], select=[a, c, $2, $3, pyFunc(c, c) AS w0$o0])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
new file mode 100644
index 00000000000..0b95247d2c4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -0,0 +1,452 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "rowtime",
+ "fieldType" : "TIMESTAMP(3)"
+ } ]
+ },
+ "description" : "Calc(select=[a, c, PROCTIME() AS proctime, rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-python-over-aggregate_1",
+ "overSpec" : {
+ "partition" : {
+ "fields" : [ 0 ]
+ },
+ "groups" : [ {
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 2,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "isRows" : false,
+ "lowerBound" : {
+ "kind" : "UNBOUNDED_PRECEDING"
+ },
+ "upperBound" : {
+ "kind" : "CURRENT_ROW"
+ },
+ "aggCalls" : [ {
+ "name" : "w0$o0",
+ "systemName" : "pyFunc",
+ "argList" : [ 1, 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ]
+ } ],
+ "constants" : [ ],
+ "originalInputFields" : 4
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "proctime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w0$o0",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
new file mode 100644
index 00000000000..d7e652d3bb1
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
@@ -0,0 +1,387 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "proctime",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$PROCTIME$1",
+ "operands" : [ ],
+ "type" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "PROCTIME"
+ }
+ },
+ "serializableString" : "PROCTIME()"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "rowtime",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`rowtime`"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 2 ], [ 3 ] ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` INT, `c` INT NOT NULL, `rowtime` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-python-over-aggregate_1",
+ "overSpec" : {
+ "partition" : {
+ "fields" : [ 0 ]
+ },
+ "groups" : [ {
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 2,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "isRows" : true,
+ "lowerBound" : {
+ "kind" : "BOUNDED_WINDOW",
+ "isPreceding" : true,
+ "offset" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "INT NOT NULL"
+ }
+ },
+ "upperBound" : {
+ "kind" : "CURRENT_ROW"
+ },
+ "aggCalls" : [ {
+ "name" : "w0$o0",
+ "systemName" : "pyFunc",
+ "argList" : [ 1, 1 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT"
+ } ]
+ } ],
+ "constants" : [ {
+ "kind" : "LITERAL",
+ "value" : "5",
+ "type" : "INT NOT NULL"
+ } ],
+ "originalInputFields" : 4
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "a",
+ "fieldType" : "INT"
+ }, {
+ "name" : "c",
+ "fieldType" : "INT NOT NULL"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "$3",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "w0$o0",
+ "fieldType" : "BIGINT"
+ } ]
+ },
+ "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, pyFunc(c, c) AS w0$o0])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`$0` BIGINT, `$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[$0, $1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file