You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/23 13:18:14 UTC
[flink] 02/02: [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aea176e1fc15a9cf0388e8b32ce3bb3688e876bf
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 15:38:52 2022 +0100
[FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits
This closes #19137.
---
.../operations/utils/CalculatedTableFactory.java | 6 ++-
.../table/api/ImplicitExpressionConversions.scala | 8 +--
.../calcite/sql/validate/ProcedureNamespace.java | 35 ++++++-------
.../table/planner/calcite/FlinkRelBuilder.java | 59 ++++++++++++++++++++++
.../functions/bridging/BridgingSqlFunction.java | 39 +++++++++++++-
.../planner/plan/QueryOperationConverter.java | 12 +++--
.../planner/plan/utils/SetOpRewriteUtil.scala | 12 +++--
.../planner/runtime/stream/sql/FunctionITCase.java | 33 +++++-------
.../runtime/stream/table/FunctionITCase.java | 39 +++++++-------
.../utils/JavaUserDefinedTableFunctions.java | 6 ++-
.../planner/plan/batch/sql/SetOperatorsTest.xml | 8 +--
.../planner/plan/batch/table/CorrelateTest.xml | 28 +++++-----
.../planner/plan/batch/table/SetOperatorsTest.xml | 8 +--
.../stringexpr/CorrelateStringExpressionTest.xml | 34 ++++++-------
.../planner/plan/common/PartialInsertTest.xml | 16 +++---
.../rules/logical/RewriteIntersectAllRuleTest.xml | 12 ++---
.../plan/rules/logical/RewriteMinusAllRuleTest.xml | 12 ++---
.../planner/plan/stream/sql/SetOperatorsTest.xml | 8 +--
.../plan/stream/table/ColumnFunctionsTest.xml | 4 +-
.../planner/plan/stream/table/CorrelateTest.xml | 51 +++++++++----------
.../stream/table/TemporalTableFunctionJoinTest.xml | 10 ++--
.../planner/plan/batch/sql/TableSourceTest.scala | 1 -
.../planner/plan/batch/table/CorrelateTest.scala | 12 -----
.../stringexpr/CorrelateStringExpressionTest.scala | 4 --
.../table/validation/CorrelateValidationTest.scala | 1 -
.../planner/plan/stream/table/CorrelateTest.scala | 17 -------
.../table/validation/CorrelateValidationTest.scala | 3 +-
.../runtime/batch/table/CorrelateITCase.scala | 3 +-
.../planner/runtime/utils/StreamingTestBase.scala | 3 ++
.../planner/utils/UserDefinedTableFunctions.scala | 24 +++------
30 files changed, 274 insertions(+), 234 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
index 09b9eca..6d20abd 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.ExpressionUtils;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.operations.CalculatedQueryOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.types.DataType;
@@ -38,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import static java.util.stream.Collectors.toList;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
/** Utility class for creating a valid {@link CalculatedQueryOperation} operation. */
@@ -89,7 +91,7 @@ final class CalculatedTableFactory {
+ alias)))
.collect(toList());
- if (!(children.get(0) instanceof CallExpression)) {
+ if (!isFunctionOfKind(children.get(0), FunctionKind.TABLE)) {
throw fail();
}
@@ -156,7 +158,7 @@ final class CalculatedTableFactory {
private ValidationException fail() {
return new ValidationException(
- "A lateral join only accepts a string expression which defines a table function "
+ "A lateral join only accepts an expression which defines a table function "
+ "call that might be followed by some alias.");
}
}
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index b2c0ca2..c36c3d1 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -158,17 +158,13 @@ trait ImplicitExpressionConversions {
}
}
- implicit class TableFunctionCall[T: TypeInformation](val t: TableFunction[T]) {
+ implicit class TableFunctionCall(val t: TableFunction[_]) {
/**
* Calls a table function for the given parameters.
*/
def apply(params: Expression*): Expression = {
- val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
- .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
- unresolvedCall(
- new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
- params.map(ApiExpressionUtils.objectToExpression): _*)
+ unresolvedCall(t, params.map(ApiExpressionUtils.objectToExpression): _*)
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
index cf9beec..22e9380 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
@@ -21,13 +21,12 @@ import org.apache.flink.annotation.Internal;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlTableFunction;
-import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
-import org.apache.calcite.sql.type.SqlTypeName;
+
+import static java.util.Objects.requireNonNull;
/**
* Namespace whose contents are defined by the result of a call to a user-defined procedure.
@@ -56,25 +55,23 @@ public final class ProcedureNamespace extends AbstractNamespace {
public RelDataType validateImpl(RelDataType targetRowType) {
validator.inferUnknownTypes(validator.unknownType, scope, call);
- final RelDataType type = validator.deriveTypeImpl(scope, call);
+ // The result is ignored but the type is derived to trigger the validation
+ validator.deriveTypeImpl(scope, call);
final SqlOperator operator = call.getOperator();
final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call);
- if (operator instanceof SqlTableFunction) {
- final SqlTableFunction tableFunction = (SqlTableFunction) operator;
- if (type.getSqlTypeName() != SqlTypeName.CURSOR) {
- throw new IllegalArgumentException(
- "Table function should have CURSOR " + "type, not " + type);
- }
- final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
- RelDataType retType = rowTypeInference.inferReturnType(callBinding);
- return validator.getTypeFactory().createTypeWithNullability(retType, false);
- }
-
- // special handling of collection tables TABLE(function(...))
- if (SqlUtil.stripAs(enclosingNode).getKind() == SqlKind.COLLECTION_TABLE) {
- return toStruct(type, getNode());
+ if (!(operator instanceof SqlTableFunction)) {
+ throw new IllegalArgumentException(
+ "Argument must be a table function: " + operator.getNameAsId());
}
- return type;
+ final SqlTableFunction tableFunction = (SqlTableFunction) operator;
+ final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
+ final RelDataType rowRelDataType =
+ requireNonNull(
+ rowTypeInference.inferReturnType(callBinding),
+ () -> "got null from inferReturnType for call " + callBinding.getCall());
+ // For BridgingSqlFunction the type can still be atomic
+ // and will be wrapped with a proper field alias
+ return toStruct(rowRelDataType, getNode());
}
/** Converts a type to a struct if it is not already. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 35ab473..c5b774d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.plan.QueryOperationConverter;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggre
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
@@ -40,6 +42,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.plan.ViewExpanders;
@@ -48,18 +51,26 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
@@ -105,6 +116,54 @@ public final class FlinkRelBuilder extends RelBuilder {
};
}
+ /**
+ * {@link RelBuilder#functionScan(SqlOperator, int, Iterable)} cannot work smoothly with aliases
+ * which is why we implement a custom one. The method is static because some {@link RelOptRule}s
+ * don't use {@link FlinkRelBuilder}.
+ */
+ public static RelBuilder pushFunctionScan(
+ RelBuilder relBuilder,
+ SqlOperator operator,
+ int inputCount,
+ Iterable<RexNode> operands,
+ List<String> aliases) {
+ Preconditions.checkArgument(
+ operator instanceof BridgingSqlFunction.WithTableFunction,
+ "Table function expected.");
+ final RexBuilder rexBuilder = relBuilder.getRexBuilder();
+ final RelDataTypeFactory typeFactory = relBuilder.getTypeFactory();
+
+ final List<RelNode> inputs = new LinkedList<>();
+ for (int i = 0; i < inputCount; i++) {
+ inputs.add(0, relBuilder.build());
+ }
+
+ final List<RexNode> operandList = CollectionUtil.iterableToList(operands);
+
+ final RelDataType functionRelDataType = rexBuilder.deriveReturnType(operator, operandList);
+ final List<RelDataType> fieldRelDataTypes;
+ if (functionRelDataType.isStruct()) {
+ fieldRelDataTypes =
+ functionRelDataType.getFieldList().stream()
+ .map(RelDataTypeField::getType)
+ .collect(Collectors.toList());
+ } else {
+ fieldRelDataTypes = Collections.singletonList(functionRelDataType);
+ }
+ final RelDataType rowRelDataType = typeFactory.createStructType(fieldRelDataTypes, aliases);
+
+ final RexNode call = rexBuilder.makeCall(rowRelDataType, operator, operandList);
+ final RelNode functionScan =
+ LogicalTableFunctionScan.create(
+ relBuilder.getCluster(),
+ inputs,
+ call,
+ null,
+ rowRelDataType,
+ Collections.emptySet());
+ return relBuilder.push(functionScan);
+ }
+
public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
final RelNode input = build();
final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 5dbe4d8..d661787 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.DataType;
@@ -35,6 +36,10 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlTableFunction;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.tools.RelBuilder;
import java.util.List;
@@ -52,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* (either a system or user-defined function).
*/
@Internal
-public final class BridgingSqlFunction extends SqlFunction {
+public class BridgingSqlFunction extends SqlFunction {
private final DataTypeFactory dataTypeFactory;
@@ -108,6 +113,10 @@ public final class BridgingSqlFunction extends SqlFunction {
functionKind == FunctionKind.SCALAR || functionKind == FunctionKind.TABLE,
"Scalar or table function kind expected.");
+ if (functionKind == FunctionKind.TABLE) {
+ return new BridgingSqlFunction.WithTableFunction(
+ dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+ }
return new BridgingSqlFunction(
dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
}
@@ -177,4 +186,32 @@ public final class BridgingSqlFunction extends SqlFunction {
public boolean isDeterministic() {
return resolvedFunction.getDefinition().isDeterministic();
}
+
+ // --------------------------------------------------------------------------------------------
+ // Table function extension
+ // --------------------------------------------------------------------------------------------
+
+ /** Special flavor of {@link BridgingSqlFunction} to indicate a table function to Calcite. */
+ public static class WithTableFunction extends BridgingSqlFunction implements SqlTableFunction {
+
+ private WithTableFunction(
+ DataTypeFactory dataTypeFactory,
+ FlinkTypeFactory typeFactory,
+ SqlKind kind,
+ ContextResolvedFunction resolvedFunction,
+ TypeInference typeInference) {
+ super(dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+ }
+
+ /**
+ * The conversion to a row type is handled on the caller side. This allows us to perform it
+ * SQL/Table API-specific. This is in particular important to set the aliases of fields
+ * correctly (see {@link FlinkRelBuilder#pushFunctionScan(RelBuilder, SqlOperator, int,
+ * Iterable, List)}).
+ */
+ @Override
+ public SqlReturnTypeInference getRowTypeInference() {
+ return getReturnTypeInference();
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 837b3b2..3fcf62fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -305,10 +305,14 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
final BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(relBuilder.getCluster(), resolvedFunction);
- return relBuilder
- .functionScan(sqlFunction, 0, parameters)
- .rename(calculatedTable.getResolvedSchema().getColumnNames())
- .build();
+ FlinkRelBuilder.pushFunctionScan(
+ relBuilder,
+ sqlFunction,
+ 0,
+ parameters,
+ calculatedTable.getResolvedSchema().getColumnNames());
+
+ return relBuilder.build();
}
private RelNode convertLegacyTableFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
index eb6d422..32e400f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.calcite.plan.RelOptUtil
@@ -73,12 +74,15 @@ object SetOpRewriteUtil {
val cluster = relBuilder.getCluster
val sqlFunction = BridgingSqlFunction.of(
- relBuilder.getCluster,
+ cluster,
BuiltInFunctionDefinitions.INTERNAL_REPLICATE_ROWS)
- relBuilder
- .functionScan(sqlFunction, 0, relBuilder.fields(Util.range(fields.size() + 1)))
- .rename(outputRelDataType.getFieldNames)
+ FlinkRelBuilder.pushFunctionScan(
+ relBuilder,
+ sqlFunction,
+ 0,
+ relBuilder.fields(Util.range(fields.size() + 1)),
+ outputRelDataType.getFieldNames)
// correlated join
val corSet = Collections.singleton(cluster.createCorrel())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index d39d180..ed19aba 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -64,6 +64,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -925,16 +926,12 @@ public class FunctionITCase extends StreamingTestBase {
public void testInvalidUseOfSystemScalarFunction() {
tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
- try {
- tEnv().explainSql("INSERT INTO SinkTable " + "SELECT * FROM TABLE(MD5('3'))");
- fail();
- } catch (ValidationException e) {
- assertThat(
- e,
- hasMessage(
- containsString(
- "Currently, only table functions can be used in a correlate operation.")));
- }
+ assertThatThrownBy(
+ () ->
+ tEnv().explainSql(
+ "INSERT INTO SinkTable "
+ + "SELECT * FROM TABLE(MD5('3'))"))
+ .hasMessageContaining("Argument must be a table function: MD5");
}
@Test
@@ -946,16 +943,12 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
- try {
- tEnv().explainSql("INSERT INTO SinkTable " + "SELECT RowTableFunction('test')");
- fail();
- } catch (ValidationException e) {
- assertThat(
- e,
- hasMessage(
- containsString(
- "Currently, only scalar functions can be used in a projection or filter operation.")));
- }
+ assertThatThrownBy(
+ () ->
+ tEnv().explainSql(
+ "INSERT INTO SinkTable "
+ + "SELECT RowTableFunction('test')"))
+ .hasMessageContaining("Cannot call table function here: 'RowTableFunction'");
}
@Test
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
index 6891d62..06af8ef 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
@@ -28,27 +28,23 @@ import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
-import static org.hamcrest.CoreMatchers.containsString;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
/** Tests for user defined functions in the Table API. */
public class FunctionITCase extends StreamingTestBase {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
@Test
- public void testScalarFunction() throws Exception {
+ void testScalarFunction() throws Exception {
final List<Row> sourceData =
Arrays.asList(Row.of(1, 1L, 1L), Row.of(2, 2L, 1L), Row.of(3, 3L, 1L));
@@ -61,7 +57,7 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(
"CREATE TABLE TestTable(a INT, b BIGINT, c BIGINT) WITH ('connector' = 'COLLECTION')");
- Table table =
+ final Table table =
tEnv().from("TestTable")
.select(
$("a"),
@@ -75,7 +71,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testJoinWithTableFunction() throws Exception {
+ void testJoinWithTableFunction() throws Exception {
final List<Row> sourceData =
Arrays.asList(
Row.of("1,2,3"), Row.of("2,3,4"), Row.of("3,4,5"), Row.of((String) null));
@@ -103,23 +99,22 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testLateralJoinWithScalarFunction() throws Exception {
- thrown.expect(ValidationException.class);
- thrown.expect(
- hasMessage(
- containsString(
- "Currently, only table functions can be used in a correlate operation.")));
-
+ void testLateralJoinWithScalarFunction() throws Exception {
TestCollectionTableFactory.reset();
tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");
tEnv().executeSql(
"CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING>) WITH ('connector' = 'COLLECTION')");
- tEnv().from("SourceTable")
- .joinLateral(call(new RowScalarFunction(), $("s")).as("a", "b"))
- .select($("a"), $("b"))
- .executeInsert("SinkTable")
- .await();
+ assertThatThrownBy(
+ () -> {
+ tEnv().from("SourceTable")
+ .joinLateral(
+ call(new RowScalarFunction(), $("s")).as("a", "b"));
+ })
+ .satisfies(
+ anyCauseMatches(
+ ValidationException.class,
+ "A lateral join only accepts an expression which defines a table function"));
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 95f6e51..9d7d559 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.functions.TableFunction;
@@ -34,7 +35,10 @@ public class JavaUserDefinedTableFunctions {
/** Emit inputs as long. */
public static class JavaTableFunc0 extends TableFunction<Long> {
- public void eval(Integer a, Long b, TimestampData c) {
+ public void eval(
+ @DataTypeHint("DATE") Integer a,
+ Long b,
+ @DataTypeHint("TIMESTAMP(0)") TimestampData c) {
collect(a.longValue());
collect(b);
collect(c.getMillisecond());
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
index d34b6e8..7b89148 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
@@ -57,8 +57,8 @@ LogicalIntersect(all=[true])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
+- Exchange(distribution=[hash[c]])
@@ -180,8 +180,8 @@ LogicalMinus(all=[true])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_vcol_marker])
+- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
index d6347f7..a2d90a0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
@@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -42,12 +42,12 @@ Calc(select=[c, d])
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -62,13 +62,13 @@ LogicalProject(c=[$0], d=[$1])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -79,13 +79,13 @@ Calc(select=[c, d])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -96,13 +96,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -114,13 +114,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')])
+- LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s], where=[(s > '')])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -131,13 +131,13 @@ Calc(select=[c, s], where=[(s > '')])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
index 598ee0e..d31bda9 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
@@ -33,8 +33,8 @@ Calc(select=[EXPR$0 AS a, b, EXPR$1 AS c])
+- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS EXPR$0, Final_COUNT(count$1) AS EXPR$1])
+- Exchange(distribution=[hash[b]])
+- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) AS sum$0, Partial_COUNT(c) AS count$1])
- +- Calc(select=[f0 AS a, f1 AS b, f2 AS c])
- +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,f0,f1,f2], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+ +- Calc(select=[a0 AS a, b0 AS b, c0 AS c])
+ +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, a, b, c], where=[(sum_vcol_marker > 0)])
+- HashAggregate(isMerge=[true], groupBy=[a, b, c], select=[a, b, c, Final_SUM(sum$0) AS sum_vcol_marker])
+- Exchange(distribution=[hash[a, b, c]])
@@ -123,8 +123,8 @@ LogicalProject(b=[$1], c=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0 AS b, f1 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,f0,f1], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT f0, VARCHAR(2147483647) f1)], joinType=[INNER])
+Calc(select=[b0 AS b, c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, b, c], where=[(sum_vcol_marker > 0)])
+- HashAggregate(isMerge=[true], groupBy=[b, c], select=[b, c, Final_SUM(sum$0) AS sum_vcol_marker])
+- Exchange(distribution=[hash[b, c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
index 09262da..e698687 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
@@ -22,13 +22,13 @@ limitations under the License.
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -39,13 +39,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -56,13 +56,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -73,13 +73,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], name=[$3], len=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -90,13 +90,13 @@ Calc(select=[c, name, len])
LogicalProject(c=[$2], name=[$3], len=[$5], adult=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, len, adult])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -104,16 +104,16 @@ Calc(select=[c, name, len, adult])
<TestCase name="testCorrelateJoins6">
<Resource name="ast">
<![CDATA[
-LogicalProject(c=[$2], name=[$4], age=[$3])
+LogicalProject(c=[$2], name=[$3], age=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, age])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -125,13 +125,13 @@ LogicalFilter(condition=[>($2, 2)])
+- LogicalProject(c=[$2], name=[$3], len=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -142,13 +142,13 @@ Calc(select=[c, name, len])
LogicalProject(a=[$0], c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index 067ed8e..1ede5ab 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -127,8 +127,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
- +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
+- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker])
+- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -165,8 +165,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
<![CDATA[
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
+- Sort(orderBy=[c ASC, d ASC])
- +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
- +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+ +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
+- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker])
+- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -205,8 +205,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
- +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
+- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt, COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt])
+- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -243,8 +243,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
<![CDATA[
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
+- Sort(orderBy=[c ASC, d ASC])
- +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
- +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+ +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
+- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
+- Exchange(distribution=[hash[a, c, d, e, f, g]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
index c5a3d2f..ad832c4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
: +- LogicalProject(f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
: +- LogicalProject(f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -113,8 +111,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
: +- LogicalProject(d=[$0], e=[$1], f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$2])
: +- LogicalFilter(condition=[=(1, 0)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
index ae6d9dc..e895649 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
: +- LogicalProject(f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
: +- LogicalProject(f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -111,8 +109,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(f=[$2])
: +- LogicalFilter(condition=[=(1, 0)])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(c=[$0])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
: +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_marker=[-1:BIGINT])
: +- LogicalProject(d=[$0], e=[$1], f=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+ +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
index 59d848b..61552b1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
@@ -58,8 +58,8 @@ LogicalIntersect(all=[true])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
+- GroupAggregate(groupBy=[c], select=[c, COUNT(vcol_left_marker) AS vcol_left_cnt, COUNT(vcol_right_marker) AS vcol_right_cnt])
+- Exchange(distribution=[hash[c]])
@@ -182,8 +182,8 @@ LogicalMinus(all=[true])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
+- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
+- GroupAggregate(groupBy=[c], select=[c, SUM(vcol_marker) AS sum_vcol_marker])
+- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
index 1c8dab3..063fcca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
@@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[(int1 = int2)], select=[int1, long1, string1,
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]], fields=[int, long, string])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index cbee6a2..2542e8d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -21,12 +21,12 @@ limitations under the License.
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -63,13 +63,13 @@ LogicalProject(c=[$0], d=[$1])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -84,13 +84,13 @@ LogicalProject(c=[$0], d=[$1])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalProject(a=[$0], b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -101,13 +101,13 @@ Calc(select=[c, d])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -118,13 +118,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -135,13 +135,13 @@ Calc(select=[c, s])
LogicalProject(c=[$2], name=[$3], len=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(TableFunc2(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -153,13 +153,13 @@ LogicalFilter(condition=[>($2, 2)])
+- LogicalProject(c=[$2], name=[$3], len=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -170,14 +170,13 @@ Calc(select=[c, name, len])
LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]])
- +- LogicalProject(f0=[$0], f1_0=[$1])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0, f10 AS f1])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
+Calc(select=[f0, f1_0 AS f1])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
]]>
</Resource>
@@ -209,12 +208,12 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], correlate=[table(str_spl
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -291,13 +290,13 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], correlate=
LogicalProject(c=[$2], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -307,12 +306,12 @@ Calc(select=[c, s])
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -322,12 +321,12 @@ Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN
<![CDATA[
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
index 0525902..47f118e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
@@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner])
: +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
: :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')])
: : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)], elementType=[class [Ljava.lang.Object;])
+: +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)])
+- LogicalTableScan(table=[[default_catalog, default_database, ThirdTable]])
]]>
</Resource>
@@ -53,7 +53,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
+- LogicalFilter(condition=[=($3, $1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -74,7 +74,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
+- LogicalFilter(condition=[=($3, $1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
- +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
+ +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -95,7 +95,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
+- LogicalFilter(condition=[=($3, $1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, ProctimeOrders]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)])
]]>
</Resource>
<Resource name="optimized exec plan">
@@ -117,7 +117,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
+- LogicalFilter(condition=[=($3, $1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
:- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
- +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+ +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
]]>
</Resource>
<Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index ca28de3..cddc89d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.plan.batch.sql
-import org.apache.flink.table.api.config.TableConfigOptions
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.utils._
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
index 14348ae..8ebaee6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
@@ -34,8 +34,6 @@ class CorrelateTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc1
- util.addFunction("func1", func)
-
val result1 = table.joinLateral(func('c) as 's).select('c, 's)
util.verifyExecPlan(result1)
@@ -46,8 +44,6 @@ class CorrelateTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc1
- util.addFunction("func1", func)
-
val result2 = table.joinLateral(func('c, "$") as 's).select('c, 's)
util.verifyExecPlan(result2)
}
@@ -57,8 +53,6 @@ class CorrelateTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc1
- util.addFunction("func1", func)
-
val result = table.leftOuterJoinLateral(func('c) as 's).select('c, 's).where('s > "")
util.verifyExecPlan(result)
}
@@ -68,8 +62,6 @@ class CorrelateTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc1
- util.addFunction("func1", func)
-
val result = table.leftOuterJoinLateral(func('c) as 's, true).select('c, 's)
util.verifyExecPlan(result)
}
@@ -79,8 +71,6 @@ class CorrelateTest extends TableTestBase {
val util = batchTestUtil()
val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc0
- util.addFunction("func1", func)
-
val result = sourceTable.select('a, 'b, 'c)
.joinLateral(func('c) as('d, 'e))
.select('c, 'd, 'e)
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc0
- util.addFunction("func1", func)
-
val result = sourceTable.select('a, 'b, 'c)
.joinLateral(func('c) as('d, 'e))
.select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
index 95b3405..a4aa185 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -29,9 +29,7 @@ class CorrelateStringExpressionTest extends TableTestBase {
private val util = batchTestUtil()
private val tab = util.addTableSource[(Int, Long, String)]("Table1", 'a, 'b, 'c)
private val func1 = new TableFunc1
- util.addFunction("func1", func1)
private val func2 = new TableFunc2
- util.addFunction("func2", func2)
@Test
def testCorrelateJoins1(): Unit = {
@@ -61,7 +59,6 @@ class CorrelateStringExpressionTest extends TableTestBase {
def testCorrelateJoins5(): Unit = {
// test hierarchy generic type
val hierarchy = new HierarchyTableFunction
- util.addFunction("hierarchy", hierarchy)
val scalaTable = tab.joinLateral(
hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
util.verifyExecPlan(scalaTable)
@@ -71,7 +68,6 @@ class CorrelateStringExpressionTest extends TableTestBase {
def testCorrelateJoins6(): Unit = {
// test pojo type
val pojo = new PojoTableFunc
- util.addFunction("pojo", pojo)
val scalaTable = tab.joinLateral(pojo('c)).select('c, 'name, 'age)
util.verifyExecPlan(scalaTable)
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
index d69383e..611928e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
@@ -36,7 +36,6 @@ class CorrelateValidationTest extends TableTestBase {
val util = batchTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val func = new TableFunc1
- util.addFunction("func1", func)
val result = table
.leftOuterJoinLateral(func('c) as 's, 'c === 's)
.select('c, 's)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 339e7ca..ff1be68 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -36,8 +36,6 @@ class CorrelateTest extends TableTestBase {
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc1
- util.addFunction("func1", function)
-
val result1 = table.joinLateral(function('c) as 's).select('c, 's)
util.verifyExecPlan(result1)
}
@@ -48,7 +46,6 @@ class CorrelateTest extends TableTestBase {
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc1
- util.addFunction("func1", function)
// test overloading
val result2 = table.joinLateral(function('c, "$") as 's).select('c, 's)
util.verifyExecPlan(result2)
@@ -59,8 +56,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc1
- util.addFunction("func1", function)
-
val result = table.leftOuterJoinLateral(function('c) as 's, true).select('c, 's)
util.verifyExecPlan(result)
}
@@ -70,7 +65,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc2
- util.addFunction("func2", function)
val scalarFunc = new Func13("pre")
val result = table.joinLateral(
@@ -84,8 +78,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new HierarchyTableFunction
- util.addFunction("hierarchy", function)
-
val result = table.joinLateral(function('c) as ('name, 'adult, 'len))
util.verifyExecPlan(result)
}
@@ -95,8 +87,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new PojoTableFunc
- util.addFunction("pojo", function)
-
val result = table.joinLateral(function('c))
util.verifyExecPlan(result)
}
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc2
- util.addFunction("func2", function)
-
val result = table
.joinLateral(function('c) as ('name, 'len))
.select('c, 'name, 'len)
@@ -120,8 +108,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc1
- util.addFunction("func1", function)
-
val result = table.joinLateral(function('c.substring(2)) as 's)
util.verifyExecPlan(result)
}
@@ -131,8 +117,6 @@ class CorrelateTest extends TableTestBase {
val util = streamTestUtil()
val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc0
- util.addFunction("func1", function)
-
val result = sourceTable.select('a, 'b, 'c)
.joinLateral(function('c) as('d, 'e))
.select('c, 'd, 'e)
@@ -158,7 +142,6 @@ class CorrelateTest extends TableTestBase {
val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new TableFunc0
- util.addFunction("func1", function)
val result = sourceTable.select('a, 'b, 'c)
.joinLateral(function('c) as('d, 'e))
.select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
index ec2401f..a991cc5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
@@ -68,11 +68,10 @@ class CorrelateValidationTest extends TableTestBase {
util.addFunction("func0", Func0)
// SQL API call
- // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
expectExceptionThrown(
util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
null,
- classOf[AssertionError])
+ classOf[ValidationException])
//========== throw exception when the parameters is not correct ===============
// Java Table API call
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
index 17a35c2..a125f93 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
@@ -222,7 +222,8 @@ class CorrelateITCase extends BatchTestBase {
'a.cast(DataTypes.TINYINT) as 'a,
'a.cast(DataTypes.SMALLINT) as 'b,
'b.cast(DataTypes.FLOAT) as 'c)
- .joinLateral(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+ .joinLateral(
+ tFunc('a.ifNull(0.toByte), 'b.ifNull(0.toShort), 'c.ifNull(0.toFloat)) as ('a2, 'b2, 'c2))
val results = executeQuery(result)
val expected = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 9f106c4..683543b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.rules.{ExpectedException, TemporaryFolder}
import org.junit.{After, Before, Rule}
@@ -44,6 +45,7 @@ class StreamingTestBase extends AbstractTestBase {
def tempFolder: TemporaryFolder = _tempFolder
@Before
+ @BeforeEach
def before(): Unit = {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
@@ -55,6 +57,7 @@ class StreamingTestBase extends AbstractTestBase {
}
@After
+ @AfterEach
def after(): Unit = {
StreamTestSink.clear()
TestValuesTableFactory.clearAllData()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
index f087d5a..6fcad3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
@@ -29,8 +29,6 @@ import org.apache.flink.types.Row
import org.junit.Assert
-import java.lang.Boolean
-
import scala.annotation.varargs
@@ -117,9 +115,10 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
}
@SerialVersionUID(1L)
+@DataTypeHint("ROW<x INT, y INT>")
class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
- def eval(x: Int, y: Int) = ???
+ def eval(x: java.lang.Integer, y: java.lang.Integer) = ???
override def getResultType: TypeInformation[Row] =
new RowTypeInfo(Types.INT, Types.INT)
@@ -368,28 +367,19 @@ class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
//}
@SerialVersionUID(1L)
+@DataTypeHint("ROW<f0 STRING, f1 STRING, f2 STRING>")
class TableFunc4 extends TableFunction[Row] {
def eval(b: Byte, s: Short, f: Float): Unit = {
collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
}
-
- override def getResultType: TypeInformation[Row] = {
- new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
- }
}
@SerialVersionUID(1L)
+@DataTypeHint("ROW<a INT, b INT, c INT>")
class TableFunc6 extends TableFunction[Row] {
- def eval(row: Row): Unit = {
+ def eval(@DataTypeHint("ROW<a INT, b INT, c INT>") row: Row): Unit = {
collect(row)
}
-
- override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
- Array(new RowTypeInfo(Types.INT, Types.INT, Types.INT))
-
- override def getResultType: TypeInformation[Row] = {
- new RowTypeInfo(Types.INT, Types.INT, Types.INT)
- }
}
@SerialVersionUID(1L)
@@ -421,12 +411,12 @@ class VarArgsFunc0 extends TableFunction[String] {
}
@SerialVersionUID(1L)
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
+class HierarchyTableFunction extends SplittableTableFunction[java.lang.Boolean, Integer] {
def eval(user: String) {
if (user.contains("#")) {
val splits = user.split("#")
val age = splits(1).toInt
- collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
+ collect(new Tuple3[String, java.lang.Boolean, Integer](splits(0), age >= 20, age))
}
}
}