You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2021/05/17 13:53:37 UTC
[flink] branch master updated:
[FLINK-22650][python][table-planner-blink] Support
StreamExecPythonCorrelate json serialization/deserialization
This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d09745a [FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization
d09745a is described below
commit d09745a7341669731ed0bc1662f3eb16b5e56e2a
Author: huangxingbo <hx...@gmail.com>
AuthorDate: Sat May 15 15:24:43 2021 +0800
[FLINK-22650][python][table-planner-blink] Support StreamExecPythonCorrelate json serialization/deserialization
This closes #15922.
---
flink-python/pyflink/table/tests/test_udtf.py | 50 ++-
.../nodes/exec/batch/BatchExecPythonCorrelate.java | 12 +-
.../exec/common/CommonExecPythonCorrelate.java | 26 +-
.../exec/stream/StreamExecPythonCorrelate.java | 28 +-
.../rules/logical/PythonCorrelateSplitRule.java | 66 +++-
.../batch/BatchPhysicalPythonCorrelate.scala | 10 +-
.../stream/StreamPhysicalPythonCorrelate.scala | 13 +-
.../plan/rules/logical/PythonCalcSplitRule.scala | 10 +-
.../nodes/exec/stream/JsonSerdeCoverageTest.java | 1 -
.../exec/stream/PythonCorrelateJsonPlanTest.java | 92 +++++
.../testJoinWithFilter.out | 431 +++++++++++++++++++++
.../testPythonTableFunction.out | 329 ++++++++++++++++
.../CalcPythonCorrelateTransposeRuleTest.xml | 8 +-
.../rules/logical/PythonCorrelateSplitRuleTest.xml | 17 +-
14 files changed, 1048 insertions(+), 45 deletions(-)
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 98ca8f5..976b2f6 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -78,7 +78,55 @@ class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
PyFlinkBlinkStreamTableTestCase):
- pass
+ def test_execute_from_json_plan(self):
+ # create source file path
+ tmp_dir = self.tempdir
+ data = ['1,1', '3,2', '2,1']
+ source_path = tmp_dir + '/test_execute_from_json_plan_input.csv'
+ sink_path = tmp_dir + '/test_execute_from_json_plan_out'
+ with open(source_path, 'w') as fd:
+ for ele in data:
+ fd.write(ele + '\n')
+
+ source_table = """
+ CREATE TABLE source_table (
+ a BIGINT,
+ b BIGINT
+ ) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '%s',
+ 'format' = 'csv'
+ )
+ """ % source_path
+ self.t_env.execute_sql(source_table)
+
+ self.t_env.execute_sql("""
+ CREATE TABLE sink_table (
+ a BIGINT,
+ b BIGINT,
+ c BIGINT
+ ) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '%s',
+ 'format' = 'csv'
+ )
+ """ % sink_path)
+
+ self.t_env.create_temporary_system_function(
+ "multi_emit", udtf(MultiEmit(), result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()]))
+
+ json_plan = self.t_env._j_tenv.getJsonPlan("INSERT INTO sink_table "
+ "SELECT a, x, y FROM source_table "
+ "LEFT JOIN LATERAL TABLE(multi_emit(a, b))"
+ " as T(x, y)"
+ " ON TRUE")
+ from py4j.java_gateway import get_method
+ get_method(self.t_env._j_tenv.executeJsonPlan(json_plan), "await")()
+
+ import glob
+ lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
+ lines.sort()
+ self.assertEqual(lines, ['1,1,0', '2,2,0', '3,3,0', '3,3,1'])
class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
index 9e4945f..4372ff9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.java
@@ -25,7 +25,8 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
/** Batch exec node which matches along with join a Python user defined table function. */
public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate
@@ -34,10 +35,15 @@ public class BatchExecPythonCorrelate extends CommonExecPythonCorrelate
public BatchExecPythonCorrelate(
FlinkJoinType joinType,
RexCall invocation,
- RexNode condition,
InputProperty inputProperty,
RowType outputType,
String description) {
- super(joinType, invocation, condition, inputProperty, outputType, description);
+ super(
+ joinType,
+ invocation,
+ getNewNodeId(),
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 12720d1..a5492d2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -38,37 +38,47 @@ import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import java.lang.reflect.Constructor;
-import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
/** Base {@link ExecNode} which matches along with join a Python user defined table function. */
+@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
+
+ public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+ public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+
private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME =
"org.apache.flink.table.runtime.operators.python.table.RowDataPythonTableFunctionOperator";
+ @JsonProperty(FIELD_NAME_JOIN_TYPE)
private final FlinkJoinType joinType;
+
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL)
private final RexCall invocation;
public CommonExecPythonCorrelate(
FlinkJoinType joinType,
RexCall invocation,
- RexNode condition,
- InputProperty inputProperty,
+ int id,
+ List<InputProperty> inputProperties,
RowType outputType,
String description) {
- super(Collections.singletonList(inputProperty), outputType, description);
+ super(id, inputProperties, outputType, description);
+ checkArgument(inputProperties.size() == 1);
this.joinType = joinType;
this.invocation = invocation;
- if (joinType == FlinkJoinType.LEFT && condition != null) {
- throw new TableException(
- "Currently Python correlate does not support conditions in left join.");
- }
}
@SuppressWarnings("unchecked")
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
index e1e6be7..91585e9 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.java
@@ -24,19 +24,43 @@ import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonCor
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
+import java.util.Collections;
+import java.util.List;
+
/** Stream exec node which matches along with join a Python user defined table function. */
+@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamExecPythonCorrelate extends CommonExecPythonCorrelate
implements StreamExecNode<RowData> {
public StreamExecPythonCorrelate(
FlinkJoinType joinType,
RexCall invocation,
- RexNode condition,
InputProperty inputProperty,
RowType outputType,
String description) {
- super(joinType, invocation, condition, inputProperty, outputType, description);
+ this(
+ joinType,
+ invocation,
+ getNewNodeId(),
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ @JsonCreator
+ public StreamExecPythonCorrelate(
+ @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(joinType, (RexCall) invocation, id, inputProperties, outputType, description);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
index 80b2f19..829a4bd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
@@ -29,14 +29,17 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import java.util.LinkedList;
@@ -112,10 +115,36 @@ public class PythonCorrelateSplitRule extends RelOptRule {
for (int i = 0; i < primitiveFieldCount; i++) {
calcProjects.add(RexInputRef.of(i, rowType));
}
+ // change RexCorrelVariable to RexInputRef.
+ RexVisitorImpl<RexNode> visitor =
+ new RexVisitorImpl<RexNode>(true) {
+ @Override
+ public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+ RexNode expr = fieldAccess.getReferenceExpr();
+ if (expr instanceof RexCorrelVariable) {
+ RelDataTypeField field = fieldAccess.getField();
+ return new RexInputRef(field.getIndex(), field.getType());
+ } else {
+ return rexBuilder.makeFieldAccess(
+ expr.accept(this), fieldAccess.getField().getIndex());
+ }
+ }
+ };
// add the fields of the extracted rex calls.
Iterator<RexNode> iterator = extractedRexNodes.iterator();
while (iterator.hasNext()) {
- calcProjects.add(iterator.next());
+ RexNode rexNode = iterator.next();
+ if (rexNode instanceof RexCall) {
+ RexCall rexCall = (RexCall) rexNode;
+ List<RexNode> newProjects =
+ rexCall.getOperands().stream()
+ .map(x -> x.accept(visitor))
+ .collect(Collectors.toList());
+ RexCall newRexCall = rexCall.clone(rexCall.getType(), newProjects);
+ calcProjects.add(newRexCall);
+ } else {
+ calcProjects.add(rexNode);
+ }
}
List<String> nameList = new LinkedList<>();
@@ -252,18 +281,31 @@ public class PythonCorrelateSplitRule extends RelOptRule {
mergedCalc.copy(mergedCalc.getTraitSet(), newScan, mergedCalc.getProgram());
}
- FlinkLogicalCalc leftCalc =
- createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate);
+ FlinkLogicalCorrelate newCorrelate;
+ if (extractedRexNodes.size() > 0) {
+ FlinkLogicalCalc leftCalc =
+ createNewLeftCalc(left, rexBuilder, extractedRexNodes, correlate);
- FlinkLogicalCorrelate newCorrelate =
- new FlinkLogicalCorrelate(
- correlate.getCluster(),
- correlate.getTraitSet(),
- leftCalc,
- rightNewInput,
- correlate.getCorrelationId(),
- correlate.getRequiredColumns(),
- correlate.getJoinType());
+ newCorrelate =
+ new FlinkLogicalCorrelate(
+ correlate.getCluster(),
+ correlate.getTraitSet(),
+ leftCalc,
+ rightNewInput,
+ correlate.getCorrelationId(),
+ correlate.getRequiredColumns(),
+ correlate.getJoinType());
+ } else {
+ newCorrelate =
+ new FlinkLogicalCorrelate(
+ correlate.getCluster(),
+ correlate.getTraitSet(),
+ left,
+ rightNewInput,
+ correlate.getCorrelationId(),
+ correlate.getRequiredColumns(),
+ correlate.getJoinType());
+ }
FlinkLogicalCalc newTopCalc =
createTopCalc(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
index 53f8405..298476c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.batch
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate
import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
@@ -64,10 +65,17 @@ class BatchPhysicalPythonCorrelate(
}
override def translateToExecNode(): ExecNode[_] = {
+ if (condition.orNull != null) {
+ if (joinType == JoinRelType.LEFT) {
+ throw new TableException("Currently Python correlate does not support conditions" +
+ " in left join.")
+ }
+ throw new TableException("The condition of BatchPhysicalPythonCorrelate should be null.")
+ }
+
new BatchExecPythonCorrelate(
JoinTypeUtil.getFlinkJoinType(joinType),
scan.getCall.asInstanceOf[RexCall],
- condition.orNull,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
index c3acaf3..434f163 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
@@ -17,16 +17,18 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate
import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
/**
* Flink RelNode which matches along with join a python user defined table function.
@@ -63,10 +65,17 @@ class StreamPhysicalPythonCorrelate(
}
override def translateToExecNode(): ExecNode[_] = {
+ if (condition.orNull != null) {
+ if (joinType == JoinRelType.LEFT) {
+ throw new TableException("Currently Python correlate does not support conditions" +
+ " in left join.")
+ }
+ throw new TableException("The condition of StreamPhysicalPythonCorrelate should be null.")
+ }
+
new StreamExecPythonCorrelate(
JoinTypeUtil.getFlinkJoinType(joinType),
scan.getCall.asInstanceOf[RexCall],
- condition.orNull,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
index 10596b3..c6c9985 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala
@@ -22,7 +22,7 @@ import java.util.function.Function
import org.apache.calcite.plan.RelOptRule.{any, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rex.{RexBuilder, RexCall, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexCorrelVariable, RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram}
import org.apache.calcite.sql.validate.SqlValidatorUtil
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.functions.python.PythonFunctionKind
@@ -393,7 +393,13 @@ private class ScalarFunctionSplitter(
expr match {
case localRef: RexLocalRef if containsPythonCall(program.expandLocalRef(localRef))
=> getExtractedRexFieldAccess(fieldAccess, localRef.getIndex)
- case _ => getExtractedRexNode(fieldAccess)
+ case _: RexCorrelVariable =>
+ val field = fieldAccess.getField
+ new RexInputRef(field.getIndex, field.getType)
+ case _ =>
+ val newFieldAccess = rexBuilder.makeFieldAccess(
+ expr.accept(this), fieldAccess.getField.getIndex)
+ getExtractedRexNode(newFieldAccess)
}
} else {
fieldAccess
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
index e3539a3..6ed1b2c 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JsonSerdeCoverageTest.java
@@ -48,7 +48,6 @@ public class JsonSerdeCoverageTest {
"StreamExecGroupTableAggregate",
"StreamExecPythonGroupTableAggregate",
"StreamExecPythonOverAggregate",
- "StreamExecPythonCorrelate",
"StreamExecSort",
"StreamExecMultipleInput",
"StreamExecValues");
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
new file mode 100644
index 0000000..7b2739e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.MockPythonTableFunction;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization/deserialization for correlate. */
+public class PythonCorrelateJsonPlanTest extends TableTestBase {
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ TableConfig tableConfig = TableConfig.getDefault();
+ util = streamTestUtil(tableConfig);
+ util.addFunction(
+ "TableFunc", new MockPythonTableFunction(), new RowTypeInfo(Types.INT, Types.INT));
+ util.addFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+ tEnv = util.getTableEnv();
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a int,\n"
+ + " b int,\n"
+ + " c int,\n"
+ + " d timestamp(3)\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ }
+
+ @Test
+ public void testPythonTableFunction() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a int,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+
+ String sqlQuery =
+ "INSERT INTO MySink SELECT x, y FROM MyTable "
+ + "LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE";
+ util.verifyJsonPlan(sqlQuery);
+ }
+
+ @Test
+ public void testJoinWithFilter() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a int,\n"
+ + " b int\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+
+ String sqlQuery =
+ "INSERT INTO MySink SELECT x, y FROM MyTable, "
+ + "LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y";
+ util.verifyJsonPlan(sqlQuery);
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
new file mode 100644
index 0000000..fe5899b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
@@ -0,0 +1,431 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+ "scanTableSource" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MyTable"
+ },
+ "catalogTable" : {
+ "schema.3.data-type" : "TIMESTAMP(3)",
+ "schema.2.data-type" : "INT",
+ "schema.3.name" : "d",
+ "connector" : "values",
+ "schema.0.data-type" : "INT",
+ "schema.2.name" : "c",
+ "schema.1.name" : "b",
+ "bounded" : "false",
+ "schema.0.name" : "a",
+ "schema.1.data-type" : "INT"
+ }
+ },
+ "id" : 1,
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ } ]
+ },
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "*",
+ "kind" : "TIMES",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "condition" : null,
+ "id" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "f0" : "INT"
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "TableFunc",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "pyFunc",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION",
+ "displayName" : "pyFunc",
+ "functionKind" : "SCALAR",
+ "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "structKind" : "FULLY_QUALIFIED",
+ "nullable" : false,
+ "fields" : [ {
+ "typeName" : "INTEGER",
+ "nullable" : true,
+ "fieldName" : "f0"
+ }, {
+ "typeName" : "INTEGER",
+ "nullable" : true,
+ "fieldName" : "f1"
+ } ]
+ }
+ },
+ "id" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "f0" : "INT"
+ }, {
+ "f00" : "INT"
+ }, {
+ "f1" : "INT"
+ } ]
+ },
+ "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[INNER])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "condition" : {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "AND",
+ "kind" : "AND",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "=",
+ "kind" : "EQUALS",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "+",
+ "kind" : "PLUS",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "*",
+ "kind" : "TIMES",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "BOOLEAN",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "=",
+ "kind" : "EQUALS",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "BOOLEAN",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "BOOLEAN",
+ "nullable" : true
+ }
+ },
+ "id" : 4,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "x" : "INT"
+ }, {
+ "y" : "INT"
+ } ]
+ },
+ "description" : "Calc(select=[f00 AS x, f1 AS y], where=[(((f1 + 1) = (f1 * f1)) AND (f00 = a))])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+ "dynamicTableSink" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MySink"
+ },
+ "catalogTable" : {
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values",
+ "schema.0.data-type" : "INT",
+ "schema.1.name" : "b",
+ "schema.0.name" : "a",
+ "schema.1.data-type" : "INT"
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "id" : 5,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "x" : "INT"
+ }, {
+ "y" : "INT"
+ } ]
+ },
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
new file mode 100644
index 0000000..c2b71f2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
@@ -0,0 +1,329 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+ "scanTableSource" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MyTable"
+ },
+ "catalogTable" : {
+ "schema.3.data-type" : "TIMESTAMP(3)",
+ "schema.2.data-type" : "INT",
+ "schema.3.name" : "d",
+ "connector" : "values",
+ "schema.0.data-type" : "INT",
+ "schema.2.name" : "c",
+ "schema.1.name" : "b",
+ "bounded" : "false",
+ "schema.0.name" : "a",
+ "schema.1.data-type" : "INT"
+ }
+ },
+ "id" : 1,
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ } ]
+ },
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : {
+ "typeName" : "TIMESTAMP",
+ "nullable" : true,
+ "precision" : 3
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "*",
+ "kind" : "TIMES",
+ "syntax" : "BINARY"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "condition" : null,
+ "id" : 2,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "f0" : "INT"
+ } ]
+ },
+ "description" : "Calc(select=[a, b, c, d, (a * a) AS f0])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate",
+ "joinType" : "LEFT",
+ "functionCall" : {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "TableFunc",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "REX_CALL",
+ "operator" : {
+ "name" : "pyFunc",
+ "kind" : "OTHER_FUNCTION",
+ "syntax" : "FUNCTION",
+ "displayName" : "pyFunc",
+ "functionKind" : "SCALAR",
+ "instance" : "rO0ABXNyAGBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucnVudGltZS51dGlscy5KYXZhVXNlckRlZmluZWRTY2FsYXJGdW5jdGlvbnMkUHl0aG9uU2NhbGFyRnVuY3Rpb275pBZGRJT8qAIAAUwABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb26383IwrjqOqQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cHQABnB5RnVuYw"
+ },
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : false
+ }
+ } ],
+ "type" : {
+ "structKind" : "FULLY_QUALIFIED",
+ "nullable" : false,
+ "fields" : [ {
+ "typeName" : "INTEGER",
+ "nullable" : true,
+ "fieldName" : "f0"
+ }, {
+ "typeName" : "INTEGER",
+ "nullable" : true,
+ "fieldName" : "f1"
+ } ]
+ }
+ },
+ "id" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "a" : "INT"
+ }, {
+ "b" : "INT"
+ }, {
+ "c" : "INT"
+ }, {
+ "d" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : true,
+ "precision" : 3,
+ "kind" : "REGULAR"
+ }
+ }, {
+ "f0" : "INT"
+ }, {
+ "f00" : "INT"
+ }, {
+ "f1" : "INT"
+ } ]
+ },
+ "description" : "PythonCorrelate(invocation=[TableFunc($4, pyFunc($0, $1))], correlate=[table(TableFunc(f0,pyFunc(a, b)))], select=[a,b,c,d,f0,f00,f1], rowType=[RecordType(INTEGER a, INTEGER b, INTEGER c, TIMESTAMP(3) d, INTEGER f0, INTEGER f00, INTEGER f1)], joinType=[LEFT])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : {
+ "typeName" : "INTEGER",
+ "nullable" : true
+ }
+ } ],
+ "condition" : null,
+ "id" : 4,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "x" : "INT"
+ }, {
+ "y" : "INT"
+ } ]
+ },
+ "description" : "Calc(select=[f00 AS x, f1 AS y])"
+ }, {
+ "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
+ "dynamicTableSink" : {
+ "identifier" : {
+ "catalogName" : "default_catalog",
+ "databaseName" : "default_database",
+ "tableName" : "MySink"
+ },
+ "catalogTable" : {
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values",
+ "schema.0.data-type" : "INT",
+ "schema.1.name" : "b",
+ "schema.0.name" : "a",
+ "schema.1.data-type" : "INT"
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "id" : 5,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "nullable" : true,
+ "fields" : [ {
+ "x" : "INT"
+ }, {
+ "y" : "INT"
+ } ]
+ },
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[x, y])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
index a966437..72ac8d8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRuleTest.xml
@@ -31,12 +31,12 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f10 AS f1], where=[AND(=(f0, 2), =(+(f10, 1), *(f10, f10)), =(f00, a))])
-+- FlinkLogicalCalc(select=[a, b, c, f00, f10, pyFunc(f00, f00) AS f0])
+FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f1], where=[AND(=(f0, 2), =(+(f1, 1), *(f1, f1)), =(f00, a))])
++- FlinkLogicalCalc(select=[a, b, c, f00, f1, pyFunc(f00, f00) AS f0])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}])
- :- FlinkLogicalCalc(select=[a, b, c, *($cor0.a, $cor0.a) AS f0, $cor0.b AS f1])
+ :- FlinkLogicalCalc(select=[a, b, c, *(a, a) AS f0])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- FlinkLogicalTableFunctionScan(invocation=[func($3, $4)], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+ +- FlinkLogicalTableFunctionScan(invocation=[func($3, $1)], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
index 0d4bf9e..c77a625 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRuleTest.xml
@@ -32,9 +32,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3])
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, f00 AS f0])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
- :- FlinkLogicalCalc(select=[a, b, c, pyFunc(f0) AS f0])
- : +- FlinkLogicalCalc(select=[a, b, c, $cor0.c AS f0])
- : +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ :- FlinkLogicalCalc(select=[a, b, c, pyFunc(c) AS f0])
+ : +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- FlinkLogicalTableFunctionScan(invocation=[javaFunc($3)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
]]>
</Resource>
@@ -53,11 +52,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$3], y=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f10 AS f1])
+FlinkLogicalCalc(select=[a, b, c, f00 AS f0, f1])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1, 2}])
- :- FlinkLogicalCalc(select=[a, b, c, *($cor0.a, $cor0.a) AS f0, $cor0.b AS f1, $cor0.c AS f2])
+ :- FlinkLogicalCalc(select=[a, b, c, *(a, a) AS f0])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- FlinkLogicalTableFunctionScan(invocation=[func($3, pyFunc($4, $5))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+ +- FlinkLogicalTableFunctionScan(invocation=[func($3, pyFunc($1, $2))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
]]>
</Resource>
</TestCase>
@@ -78,7 +77,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4])
FlinkLogicalCalc(select=[a, b, c, f00 AS x])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
:- FlinkLogicalCalc(select=[a, b, c, d, pyFunc(f0) AS f0])
- : +- FlinkLogicalCalc(select=[a, b, c, d, $cor0.d._1 AS f0])
+ : +- FlinkLogicalCalc(select=[a, b, c, d, d._1 AS f0])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+- FlinkLogicalTableFunctionScan(invocation=[javaFunc($4)], rowType=[RecordType(VARCHAR(2147483647) f0)], elementType=[class [Ljava.lang.Object;])
]]>
@@ -100,9 +99,9 @@ LogicalProject(a=[$0], b=[$1], c=[$2], x=[$4], y=[$5])
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, f00 AS x, f10 AS y])
+- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2, 3}])
- :- FlinkLogicalCalc(select=[a, b, c, d, *($cor0.d._1, $cor0.a) AS f0, $cor0.d._2 AS f1, $cor0.c AS f2])
+ :- FlinkLogicalCalc(select=[a, b, c, d, *(d._1, a) AS f0, d._2 AS f1])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
- +- FlinkLogicalTableFunctionScan(invocation=[func($4, pyFunc($5, $6))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
+ +- FlinkLogicalTableFunctionScan(invocation=[func($4, pyFunc($5, $2))], rowType=[RecordType(INTEGER f0, INTEGER f1)], elementType=[class [Ljava.lang.Object;])
]]>
</Resource>
</TestCase>