You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2021/05/17 13:53:37 UTC

[flink] branch master updated: [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d09745a  [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization
d09745a is described below

commit d09745a7341669731ed0bc1662f3eb16b5e56e2a
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Sat May 15 15:24:43 2021 +0800

    [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization
    
    This closes #15922.
---
 flink-python/pyflink/table/tests/test_udtf.py      |  50 ++-
 .../nodes/exec/batch/BatchExecPythonCorrelate.java |  12 +-
 .../exec/common/CommonExecPythonCorrelate.java     |  26 +-
 .../exec/stream/StreamExecPythonCorrelate.java     |  28 +-
 .../rules/logical/PythonCorrelateSplitRule.java    |  66 +++-
 .../batch/BatchPhysicalPythonCorrelate.scala       |  10 +-
 .../stream/StreamPhysicalPythonCorrelate.scala     |  13 +-
 .../plan/rules/logical/PythonCalcSplitRule.scala   |  10 +-
 .../nodes/exec/stream/JsonSerdeCoverageTest.java   |   1 -
 .../exec/stream/PythonCorrelateJsonPlanTest.java   |  92 +++++
 .../testJoinWithFilter.out                         | 431 +++++++++++++++++++++
 .../testPythonTableFunction.out                    | 329 ++++++++++++++++
 .../CalcPythonCorrelateTransposeRuleTest.xml       |   8 +-
 .../rules/logical/PythonCorrelateSplitRuleTest.xml |  17 +-
 14 files changed, 1048 insertions(+), 45 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 98ca8f5..976b2f6 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -78,7 +78,55 @@ class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
 
 class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
                                                  PyFlinkBlinkStreamTableTestCase):
-    pass
+    def test_execute_from_json_plan(self):
+        # create source file path
+        tmp_dir = self.tempdir
+        data = ['1,1', '3,2', '2,1']
+        source_path = tmp_dir + '/test_execute_from_json_plan_input.csv'
+        sink_path = tmp_dir + '/test_execute_from_json_plan_out'
+        with open(source_path, 'w') as fd:
+            for ele in data:
+                fd.write(ele + '\n')
+
+        source_table = """
+            CREATE TABLE source_table (
+                a BIGINT,
+                b BIGINT
+            ) WITH (
+                'connector' = 'filesystem',
+                'path' = '%s',
+                'format' = 'csv'
+            )
+        """ % source_path
+        self.t_env.execute_sql(source_table)
+
+        self.t_env.execute_sql("""
+            CREATE TABLE sink_table (
+                a BIGINT,
+                b BIGINT,
+                c BIGINT
+            ) WITH (
+                'connector' = 'filesystem',
+                'path' = '%s',
+                'format' = 'csv'
+            )
+        """ % sink_path)
+
+        self.t_env.create_temporary_system_function(
+            "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]))
+
+        json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table "
+                                                   "SELECT a, x, y FROM source_table "
+                                                   "LEFT JOIN LATERAL TABLE(multi_emit(a, b))"
+                                                   " as T(x, y)"
+                                                   " ON TRUE")
+        from py4j.java_gateway import get_method
+        get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")()
+
+        import glob
+        lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
+        lines.sort()
+        self.assertEqual(lines, ['1,1,0', '2,2,0', '3,3,0', '3,3,1'])
 
 
 class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
index 9e4945f..4372ff9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
@@ -25,7 +25,8 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
 
 /** Batch exec node which matches along with join a Python user defined table function. */
 public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate
@@ -34,10 +35,15 @@ public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate
     public BatchExecPythonCorrelate(
             FlinkJoinType joinType,
             RexCall invocation,
-            RexNode condition,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(joinType, invocation, condition, inputProperty, outputType, description);
+        super(
+                joinType,
+                invocation,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 12720d1..a5492d2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -38,37 +38,47 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 
 import java.lang.reflect.Constructor;
-import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Base {@link ExecNode} which matches along with join a Python user defined table function. */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
         implements SingleTransformationTranslator<RowData> {
+
+    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+    public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+
     private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
             "org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator";
 
+    @JsonProperty(FIELD_NAME_JOIN_TYPE)
     private final FlinkJoinType joinType;
+
+    @JsonProperty(FIELD_NAME_FUNCTION_CALL)
     private final RexCall invocation;
 
     public CommonExecPythonCorrelate(
             FlinkJoinType joinType,
             RexCall invocation,
-            RexNode condition,
-            InputProperty inputProperty,
+            int id,
+            List<InputProperty> inputProperties,
             RowType outputType,
             String description) {
-        super(Collections.singletonList(inputProperty), outputType, description);
+        super(id, inputProperties, outputType, description);
+        checkArgument(inputProperties.size() == 1);
         this.joinType = joinType;
         this.invocation = invocation;
-        if (joinType == FlinkJoinType.LEFT && condition != null) {
-            throw new TableException(
-                    "Currently Python correlate does not support conditions in left join.");
-        }
     }
 
     @SuppressWarnings("unchecked")
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
index e1e6be7..91585e9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
@@ -24,19 +24,43 @@ import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCor
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 
+import java.util.Collections;
+import java.util.List;
+
 /** Stream exec node which matches along with join a Python user defined table function. */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
         implements StreamExecNode<RowData> {
     public StreamExecPythonCorrelate(
             FlinkJoinType joinType,
             RexCall invocation,
-            RexNode condition,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
-        super(joinType, invocation, condition, inputProperty, outputType, description);
+        this(
+                joinType,
+                invocation,
+                getNewNodeId(),
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @JsonCreator
+    public StreamExecPythonCorrelate(
+            @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+            @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(joinType, (RexCall) invocation, id, inputProperties, outputType, description);
     }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
index 80b2f19..829a4bd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
@@ -29,14 +29,17 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 
 import java.util.LinkedList;
@@ -112,10 +115,36 @@ public class PythonCorrelateSplitRule extends RelOptRule {
         for (int i = 0; i < primitiveFieldCount; i++) {
             calcProjects.add(RexInputRef.of(i, rowType));
         }
+        // change RexCorrelVariable to RexInputRef.
+        RexVisitorImpl<RexNode> visitor =
+                new RexVisitorImpl<RexNode>(true) {
+                    @Override
+                    public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+                        RexNode expr = fieldAccess.getReferenceExpr();
+                        if (expr instanceof RexCorrelVariable) {
+                            RelDataTypeField field = fieldAccess.getField();
+                            return new RexInputRef(field.getIndex(), field.getType());
+                        } else {
+                            return rexBuilder.makeFieldAccess(
+                                    expr.accept(this), fieldAccess.getField().getIndex());
+                        }
+                    }
+                };
         // add the fields of the extracted rex calls.
         Iterator<RexNode> iterator = extractedRexNodes.iterator();
         while (iterator.hasNext()) {
-            calcProjects.add(iterator.next());
+            RexNode rexNode = iterator.next();
+            if (rexNode instanceof RexCall) {
+                RexCall rexCall = (RexCall) rexNode;
+                List<RexNode> newProjects =
+                        rexCall.getOperands().stream()
+                                .map(x -> x.accept(visitor))
+                                .collect(Collectors.toList());
+                RexCall newRexCall = rexCall.clone(rexCall.getType(), newProjects);
+                calcProjects.add(newRexCall);
+            } else {
+                calcProjects.add(rexNode);
+            }
         }
 
         List<String> nameList = new LinkedList<>();
@@ -252,18 +281,31 @@ public class PythonCorrelateSplitRule extends RelOptRule {
                     mergedCalc.copy(mergedCalc.getTraitSet(), newScan, mergedCalc.getProgram());
         }
 
-        FlinkLogicalCalc leftCalc =
-                createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate);
+        FlinkLogicalCorrelate newCorrelate;
+        if (extractedRexNodes.size() > 0) {
+            FlinkLogicalCalc leftCalc =
+                    createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate);
 
-        FlinkLogicalCorrelate newCorrelate =
-                new FlinkLogicalCorrelate(
-                        correlate.getCluster(),
-                        correlate.getTraitSet(),
-                        leftCalc,
-                        rightNewInput,
-                        correlate.getCorrelationId(),
-                        correlate.getRequiredColumns(),
-                        correlate.getJoinType());
+            newCorrelate =
+                    new FlinkLogicalCorrelate(
+                            correlate.getCluster(),
+                            correlate.getTraitSet(),
+                            leftCalc,
+                            rightNewInput,
+                            correlate.getCorrelationId(),
+                            correlate.getRequiredColumns(),
+                            correlate.getJoinType());
+        } else {
+            newCorrelate =
+                    new FlinkLogicalCorrelate(
+                            correlate.getCluster(),
+                            correlate.getTraitSet(),
+                            left,
+                            rightNewInput,
+                            correlate.getCorrelationId(),
+                            correlate.getRequiredColumns(),
+                            correlate.getJoinType());
+        }
 
         FlinkLogicalCalc newTopCalc =
                 createTopCalc(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
index 53f8405..298476c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate
 import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
@@ -64,10 +65,17 @@ class BatchPhysicalPythonCorrelate(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+    if (condition.orNull != null) {
+      if (joinType == JoinRelType.LEFT) {
+        throw new TableException("Currently Python correlate does not support conditions" +
+                                   " in left join.")
+      }
+      throw new TableException("The condition of BatchPhysicalPythonCorrelate should be null.")
+    }
+
     new BatchExecPythonCorrelate(
       JoinTypeUtil.getFlinkJoinType(joinType),
       scan.getCall.asInstanceOf[RexCall],
-      condition.orNull,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
index c3acaf3..434f163 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
@@ -17,16 +17,18 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate
 import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
 
 /**
   * Flink RelNode which matches along with join a python user defined table function.
@@ -63,10 +65,17 @@ class StreamPhysicalPythonCorrelate(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
+    if (condition.orNull != null) {
+      if (joinType == JoinRelType.LEFT) {
+        throw new TableException("Currently Python correlate does not support conditions" +
+                                   " in left join.")
+      }
+      throw new TableException("The condition of StreamPhysicalPythonCorrelate should be null.")
+    }
+
     new StreamExecPythonCorrelate(
       JoinTypeUtil.getFlinkJoinType(joinType),
       scan.getCall.asInstanceOf[RexCall],
-      condition.orNull,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
index 10596b3..c6c9985 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
@@ -22,7 +22,7 @@ import java.util.function.Function
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rex.{RexBuilder, RexCall, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexCorrelVariable, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram}
 import org.apache.calcite.sql.validate.SqlValidatorUtil
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.functions.python.PythonFunctionKind
@@ -393,7 +393,13 @@ private class ScalarFunctionSplitter(
       expr match {
         case localRef: RexLocalRef if containsPythonCall(program.expandLocalRef(localRef))
           => getExtractedRexFieldAccess(fieldAccess, localRef.getIndex)
-        case _ => getExtractedRexNode(fieldAccess)
+        case _: RexCorrelVariable =>
+          val field = fieldAccess.getField
+          new RexInputRef(field.getIndex, field.getType)
+        case _ =>
+          val newFieldAccess = rexBuilder.makeFieldAccess(
+            expr.accept(this), fieldAccess.getField.getIndex)
+          getExtractedRexNode(newFieldAccess)
       }
     } else {
       fieldAccess
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
index e3539a3..6ed1b2c 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
@@ -48,7 +48,6 @@ public class JsonSerdeCoverageTest {
                     "StreamExecGroupTableAggregate",
                     "StreamExecPythonGroupTableAggregate",
                     "StreamExecPythonOverAggregate",
-                    "StreamExecPythonCorrelate",
                     "StreamExecSort",
                     "StreamExecMultipleInput",
                     "StreamExecValues");
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
new file mode 100644
index 0000000..7b2739e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.MockPythonTableFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for correlate. */
+public class PythonCorrelateJsonPlanTest extends TableTestBase {
+    private StreamTableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        TableConfig tableConfig = TableConfig.getDefault();
+        util = streamTestUtil(tableConfig);
+        util.addFunction(
+                "TableFunc", new MockPythonTableFunction(), new RowTypeInfo(Types.INT, Types.INT));
+        util.addFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+        tEnv = util.getTableEnv();
+
+        String srcTableDdl =
+                "CREATE TABLE MyTable (\n"
+                        + "  a int,\n"
+                        + "  b int,\n"
+                        + "  c int,\n"
+                        + "  d timestamp(3)\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'false')";
+        tEnv.executeSql(srcTableDdl);
+    }
+
+    @Test
+    public void testPythonTableFunction() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a int,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+
+        String sqlQuery =
+                "INSERT INTO MySink SELECT x, y FROM MyTable "
+                        + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE";
+        util.verifyJsonPlan(sqlQuery);
+    }
+
+    @Test
+    public void testJoinWithFilter() {
+        String sinkTableDdl =
+                "CREATE TABLE MySink (\n"
+                        + "  a int,\n"
+                        + "  b int\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
+        tEnv.executeSql(sinkTableDdl);
+
+        String sqlQuery =
+                "INSERT INTO MySink SELECT x, y FROM MyTable, "
+                        + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y";
+        util.verifyJsonPlan(sqlQuery);
+    }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
new file mode 100644
index 0000000..fe5899b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
@@ -0,0 +1,431 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+    "scanTableSource" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MyTable"
+      },
+      "catalogTable" : {
+        "schema.3.data-type" : "TIMESTAMP(3)",
+        "schema.2.data-type" : "INT",
+        "schema.3.name" : "d",
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.2.name" : "c",
+        "schema.1.name" : "b",
+        "bounded" : "false",
+        "schema.0.name" : "a",
+        "schema.1.data-type" : "INT"
+      }
+    },
+    "id" : 1,
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "*",
+        "kind" : "TIMES",
+        "syntax" : "BINARY"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      } ],
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "f0" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "TableFunc",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "REX_CALL",
+        "operator" : {
+          "name" : "pyFunc",
+          "kind" : "OTHER_FUNCTION",
+          "syntax" : "FUNCTION",
+          "displayName" : "pyFunc",
+          "functionKind" : "SCALAR",
+          "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw"
+        },
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        } ],
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : false
+        }
+      } ],
+      "type" : {
+        "structKind" : "FULLY_QUALIFIED",
+        "nullable" : false,
+        "fields" : [ {
+          "typeName" : "INTEGER",
+          "nullable" : true,
+          "fieldName" : "f0"
+        }, {
+          "typeName" : "INTEGER",
+          "nullable" : true,
+          "fieldName" : "f1"
+        } ]
+      }
+    },
+    "id" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "f0" : "INT"
+      }, {
+        "f00" : "INT"
+      }, {
+        "f1" : "INT"
+      } ]
+    },
+    "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[INNER])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "AND",
+        "kind" : "AND",
+        "syntax" : "BINARY"
+      },
+      "operands" : [ {
+        "kind" : "REX_CALL",
+        "operator" : {
+          "name" : "=",
+          "kind" : "EQUALS",
+          "syntax" : "BINARY"
+        },
+        "operands" : [ {
+          "kind" : "REX_CALL",
+          "operator" : {
+            "name" : "+",
+            "kind" : "PLUS",
+            "syntax" : "BINARY"
+          },
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : {
+              "typeName" : "INTEGER",
+              "nullable" : true
+            }
+          }, {
+            "kind" : "LITERAL",
+            "value" : "1",
+            "type" : {
+              "typeName" : "INTEGER",
+              "nullable" : false
+            }
+          } ],
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        }, {
+          "kind" : "REX_CALL",
+          "operator" : {
+            "name" : "*",
+            "kind" : "TIMES",
+            "syntax" : "BINARY"
+          },
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : {
+              "typeName" : "INTEGER",
+              "nullable" : true
+            }
+          }, {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 6,
+            "type" : {
+              "typeName" : "INTEGER",
+              "nullable" : true
+            }
+          } ],
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        } ],
+        "type" : {
+          "typeName" : "BOOLEAN",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "REX_CALL",
+        "operator" : {
+          "name" : "=",
+          "kind" : "EQUALS",
+          "syntax" : "BINARY"
+        },
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 5,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        } ],
+        "type" : {
+          "typeName" : "BOOLEAN",
+          "nullable" : true
+        }
+      } ],
+      "type" : {
+        "typeName" : "BOOLEAN",
+        "nullable" : true
+      }
+    },
+    "id" : 4,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "x" : "INT"
+      }, {
+        "y" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[f00 AS x, f1 AS y], where=[(((f1 + 1) = (f1 * f1)) AND (f00 = a))])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+    "dynamicTableSink" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MySink"
+      },
+      "catalogTable" : {
+        "table-sink-class" : "DEFAULT",
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.1.name" : "b",
+        "schema.0.name" : "a",
+        "schema.1.data-type" : "INT"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "id" : 5,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "x" : "INT"
+      }, {
+        "y" : "INT"
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
new file mode 100644
index 0000000..c2b71f2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
@@ -0,0 +1,329 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+    "scanTableSource" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MyTable"
+      },
+      "catalogTable" : {
+        "schema.3.data-type" : "TIMESTAMP(3)",
+        "schema.2.data-type" : "INT",
+        "schema.3.name" : "d",
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.2.name" : "c",
+        "schema.1.name" : "b",
+        "bounded" : "false",
+        "schema.0.name" : "a",
+        "schema.1.data-type" : "INT"
+      }
+    },
+    "id" : 1,
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : {
+        "typeName" : "TIMESTAMP",
+        "nullable" : true,
+        "precision" : 3
+      }
+    }, {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "*",
+        "kind" : "TIMES",
+        "syntax" : "BINARY"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      } ],
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 2,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "f0" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate",
+    "joinType" : "LEFT",
+    "functionCall" : {
+      "kind" : "REX_CALL",
+      "operator" : {
+        "name" : "TableFunc",
+        "kind" : "OTHER_FUNCTION",
+        "syntax" : "FUNCTION"
+      },
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : true
+        }
+      }, {
+        "kind" : "REX_CALL",
+        "operator" : {
+          "name" : "pyFunc",
+          "kind" : "OTHER_FUNCTION",
+          "syntax" : "FUNCTION",
+          "displayName" : "pyFunc",
+          "functionKind" : "SCALAR",
+          "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw"
+        },
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        }, {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : {
+            "typeName" : "INTEGER",
+            "nullable" : true
+          }
+        } ],
+        "type" : {
+          "typeName" : "INTEGER",
+          "nullable" : false
+        }
+      } ],
+      "type" : {
+        "structKind" : "FULLY_QUALIFIED",
+        "nullable" : false,
+        "fields" : [ {
+          "typeName" : "INTEGER",
+          "nullable" : true,
+          "fieldName" : "f0"
+        }, {
+          "typeName" : "INTEGER",
+          "nullable" : true,
+          "fieldName" : "f1"
+        } ]
+      }
+    },
+    "id" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "a" : "INT"
+      }, {
+        "b" : "INT"
+      }, {
+        "c" : "INT"
+      }, {
+        "d" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : true,
+          "precision" : 3,
+          "kind" : "REGULAR"
+        }
+      }, {
+        "f0" : "INT"
+      }, {
+        "f00" : "INT"
+      }, {
+        "f1" : "INT"
+      } ]
+    },
+    "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[LEFT])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : {
+        "typeName" : "INTEGER",
+        "nullable" : true
+      }
+    } ],
+    "condition" : null,
+    "id" : 4,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "x" : "INT"
+      }, {
+        "y" : "INT"
+      } ]
+    },
+    "description" : "Calc(select=[f00 AS x, f1 AS y])"
+  }, {
+    "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+    "dynamicTableSink" : {
+      "identifier" : {
+        "catalogName" : "default_catalog",
+        "databaseName" : "default_database",
+        "tableName" : "MySink"
+      },
+      "catalogTable" : {
+        "table-sink-class" : "DEFAULT",
+        "connector" : "values",
+        "schema.0.data-type" : "INT",
+        "schema.1.name" : "b",
+        "schema.0.name" : "a",
+        "schema.1.data-type" : "INT"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "id" : 5,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "nullable" : true,
+      "fields" : [ {
+        "x" : "INT"
+      }, {
+        "y" : "INT"
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
index a966437..72ac8d8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
@@ -31,12 +31,12 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f10 AS f1], where=[AND(=(f0, 2), =(+(f10, 1), *(f10, f10)), =(f00, a))])
-+- FlinkLogicalCalc(select=[a, b, c, f00, f10, pyFunc(f00, f00) AS f0])
+FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f1], where=[AND(=(f0, 2), =(+(f1, 1), *(f1, f1)), =(f00, a))])
++- FlinkLogicalCalc(select=[a, b, c, f00, f1, pyFunc(f00, f00) AS f0])
    +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
-      :- FlinkLogicalCalc(select=[a, b, c, *($cor0.a, $cor0.a) AS f0, $cor0.b AS f1])
+      :- FlinkLogicalCalc(select=[a, b, c, *(a, a) AS f0])
       :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-      +- FlinkLogicalTableFunctionScan(invocation=[func($3, $4)], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+      +- FlinkLogicalTableFunctionScan(invocation=[func($3, $1)], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
index 0d4bf9e..c77a625 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
@@ -32,9 +32,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3])
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, c, f00 AS f0])
 +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
-   :- FlinkLogicalCalc(select=[a, b, c, pyFunc(f0) AS f0])
-   :  +- FlinkLogicalCalc(select=[a, b, c, $cor0.c AS f0])
-   :     +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   :- FlinkLogicalCalc(select=[a, b, c, pyFunc(c) AS f0])
+   :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
    +- FlinkLogicalTableFunctionScan(invocation=[javaFunc($3)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
@@ -53,11 +52,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f10 AS f1])
+FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f1])
 +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1, 2}])
-   :- FlinkLogicalCalc(select=[a, b, c, *($cor0.a, $cor0.a) AS f0, $cor0.b AS f1, $cor0.c AS f2])
+   :- FlinkLogicalCalc(select=[a, b, c, *(a, a) AS f0])
    :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- FlinkLogicalTableFunctionScan(invocation=[func($3, pyFunc($4, $5))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+   +- FlinkLogicalTableFunctionScan(invocation=[func($3, pyFunc($1, $2))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
   </TestCase>
@@ -78,7 +77,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4])
 FlinkLogicalCalc(select=[a, b, c, f00 AS x])
 +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
    :- FlinkLogicalCalc(select=[a, b, c, d, pyFunc(f0) AS f0])
-   :  +- FlinkLogicalCalc(select=[a, b, c, d, $cor0.d._1 AS f0])
+   :  +- FlinkLogicalCalc(select=[a, b, c, d, d._1 AS f0])
    :     +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
    +- FlinkLogicalTableFunctionScan(invocation=[javaFunc($4)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
 ]]>
@@ -100,9 +99,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4], y=[$5])
       <![CDATA[
 FlinkLogicalCalc(select=[a, b, c, f00 AS x, f10 AS y])
 +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2, 3}])
-   :- FlinkLogicalCalc(select=[a, b, c, d, *($cor0.d._1, $cor0.a) AS f0, $cor0.d._2 AS f1, $cor0.c AS f2])
+   :- FlinkLogicalCalc(select=[a, b, c, d, *(d._1, a) AS f0, d._2 AS f1])
    :  +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
-   +- FlinkLogicalTableFunctionScan(invocation=[func($4, pyFunc($5, $6))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+   +- FlinkLogicalTableFunctionScan(invocation=[func($4, pyFunc($5, $2))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
   </TestCase>