You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/23 08:17:44 UTC
[flink] branch master updated: [FLINK-21435][table] Use a dedicated
SQL expression in Table API
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c5ab5f8 [FLINK-21435][table] Use a dedicated SQL expression in Table API
c5ab5f8 is described below
commit c5ab5f89687a2046c7a54b9f6c4448fbebed8493
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Feb 22 14:15:04 2021 +0100
[FLINK-21435][table] Use a dedicated SQL expression in Table API
This closes #14986.
---
.../org/apache/flink/table/api/Expressions.java | 7 +-
.../table/expressions/ApiExpressionUtils.java | 4 ++
.../table/expressions/ApiExpressionVisitor.java | 4 ++
.../expressions/resolver/ExpressionResolver.java | 1 +
.../expressions/resolver/LookupCallResolver.java | 3 +-
.../resolver/rules/ResolveCallByArgumentsRule.java | 24 +------
.../resolver/rules/ResolveSqlCallRule.java | 79 ++++++++++++++++++++++
.../expressions/resolver/rules/ResolverRules.java | 4 ++
.../utils/ApiExpressionDefaultVisitor.java | 6 ++
.../flink/table/expressions/SqlCallExpression.java | 72 ++++++++++++++++++++
.../functions/BuiltInFunctionDefinitions.java | 7 --
.../expressions/PlannerExpressionConverter.scala | 3 +
.../planner/functions/BuiltInFunctionTestBase.java | 22 +++---
.../planner/functions/MiscFunctionsITCase.java | 7 +-
.../expressions/PlannerExpressionConverter.scala | 5 +-
15 files changed, 201 insertions(+), 47 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 1f5f6cc..4dfcde1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -540,7 +541,7 @@ public final class Expressions {
* table-valued functions are not supported. Sub-queries are also not allowed.
*/
public static ApiExpression callSql(String sqlExpression) {
- return apiCall(BuiltInFunctionDefinitions.CALL_SQL, sqlExpression);
+ return apiSqlCall(sqlExpression);
}
private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
@@ -568,4 +569,8 @@ public final class Expressions {
.collect(Collectors.toList());
return new ApiExpression(unresolvedCall(functionDefinition, arguments));
}
+
+ private static ApiExpression apiSqlCall(String sqlExpression) {
+ return new ApiExpression(new SqlCallExpression(sqlExpression));
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 356bf24..2d87882 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -304,6 +304,10 @@ public final class ApiExpressionUtils {
.collect(Collectors.toList()));
}
+ public static SqlCallExpression sqlCall(String sqlExpression) {
+ return new SqlCallExpression(sqlExpression);
+ }
+
public static Expression toMonthInterval(Expression e, int multiplier) {
return ExpressionUtils.extractValue(e, BigDecimal.class)
.map((v) -> intervalOfMonths(v.intValue() * multiplier))
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
index b0c2bc6..f0b34bb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
@@ -35,6 +35,8 @@ public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {
return visit((LookupCallExpression) other);
} else if (other instanceof UnresolvedCallExpression) {
return visit((UnresolvedCallExpression) other);
+ } else if (other instanceof SqlCallExpression) {
+ return visit((SqlCallExpression) other);
} else if (other instanceof ResolvedExpression) {
return visit((ResolvedExpression) other);
}
@@ -62,6 +64,8 @@ public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {
public abstract R visit(UnresolvedCallExpression unresolvedCallExpression);
+ public abstract R visit(SqlCallExpression sqlCall);
+
// --------------------------------------------------------------------------------------------
// other expressions
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 6f55d3d..39d4a4a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -93,6 +93,7 @@ public class ExpressionResolver {
ResolverRules.OVER_WINDOWS,
ResolverRules.FIELD_RESOLVE,
ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
+ ResolverRules.RESOLVE_SQL_CALL,
ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 683dc57..2e41d0e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -71,8 +71,7 @@ public class LookupCallResolver extends ApiExpressionDefaultVisitor<Expression>
@Override
public Expression visitNonApiExpression(Expression other) {
// LookupCallResolver might be called outside of ExpressionResolver, thus we need to
- // additionally
- // handle the ApiExpressions here
+ // additionally handle the ApiExpressions here
if (other instanceof ApiExpression) {
return ((ApiExpression) other).toExpr().accept(this);
} else {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 61ad2fe..8bbb038 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
@@ -32,7 +31,6 @@ import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.TypeLiteralExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -62,7 +60,6 @@ import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
@@ -94,7 +91,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
private static class ResolvingCallVisitor
extends RuleExpressionVisitor<List<ResolvedExpression>> {
- private @Nullable SurroundingInfo surroundingInfo;
+ private final @Nullable SurroundingInfo surroundingInfo;
ResolvingCallVisitor(ResolutionContext context, @Nullable SurroundingInfo surroundingInfo) {
super(context);
@@ -143,8 +140,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
if (definition == BuiltInFunctionDefinitions.FLATTEN) {
return executeFlatten(resolvedArgs);
- } else if (definition == BuiltInFunctionDefinitions.CALL_SQL) {
- return executeCallSql(resolvedArgs);
}
return Collections.singletonList(
@@ -198,23 +193,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
.collect(Collectors.toList());
}
- private List<ResolvedExpression> executeCallSql(List<ResolvedExpression> args) {
- final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
- if (args.size() != 1 || !extractValue(args.get(0), String.class).isPresent()) {
- throw new ValidationException("SQL expression must be a string literal.");
- }
-
- final String sqlExpression =
- extractValue(args.get(0), String.class).orElseThrow(IllegalStateException::new);
- final TableSchema.Builder builder = TableSchema.builder();
- resolutionContext
- .referenceLookup()
- .getAllInputFields()
- .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
- return Collections.singletonList(
- resolver.resolveExpression(sqlExpression, builder.build()));
- }
-
/** Temporary method until all calls define a type inference. */
private Optional<TypeInference> getOptionalTypeInference(FunctionDefinition definition) {
if (definition instanceof ScalarFunctionDefinition
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
new file mode 100644
index 0000000..f8bb3b3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.resolver.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Resolves {@link SqlCallExpression}s to {@link ResolvedExpression} by delegating to the planner.
+ */
+@Internal
+final class ResolveSqlCallRule implements ResolverRule {
+
+ @Override
+ public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
+ return expression.stream()
+ .map(expr -> expr.accept(new TranslateSqlCallsVisitor(context)))
+ .collect(Collectors.toList());
+ }
+
+ private static class TranslateSqlCallsVisitor extends RuleExpressionVisitor<Expression> {
+
+ TranslateSqlCallsVisitor(ResolutionContext resolutionContext) {
+ super(resolutionContext);
+ }
+
+ @Override
+ public Expression visit(SqlCallExpression sqlCall) {
+ final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
+
+ final TableSchema.Builder builder = TableSchema.builder();
+ resolutionContext
+ .referenceLookup()
+ .getAllInputFields()
+ .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
+ return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
+ }
+
+ @Override
+ public Expression visit(UnresolvedCallExpression unresolvedCall) {
+ return unresolvedCall.replaceArgs(resolveChildren(unresolvedCall.getChildren()));
+ }
+
+ @Override
+ protected Expression defaultMethod(Expression expression) {
+ return expression;
+ }
+
+ private List<Expression> resolveChildren(List<Expression> lookupChildren) {
+ return lookupChildren.stream()
+ .map(child -> child.accept(this))
+ .collect(Collectors.toList());
+ }
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index a3674ee..97f05cf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.resolver.rules;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
/** Contains instances of {@link ResolverRule}. */
@@ -32,6 +33,9 @@ public final class ResolverRules {
*/
public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule();
+ /** Resolves {@link SqlCallExpression}s. */
+ public static final ResolverRule RESOLVE_SQL_CALL = new ResolveSqlCallRule();
+
/**
* Resolves call based on argument types. See {@link ResolveCallByArgumentsRule} for details.
*/
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
index f103b37..ce499a9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.LocalReferenceExpression;
import org.apache.flink.table.expressions.LookupCallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.TypeLiteralExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
@@ -103,6 +104,11 @@ public abstract class ApiExpressionDefaultVisitor<T> extends ApiExpressionVisito
return defaultMethod(unresolvedCall);
}
+ @Override
+ public T visit(SqlCallExpression sqlCall) {
+ return defaultMethod(sqlCall);
+ }
+
// --------------------------------------------------------------------------------------------
// other expressions
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java
new file mode 100644
index 0000000..ee04449
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A call to a SQL expression.
+ *
+ * <p>The given string is parsed and translated into an {@link Expression} during planning. Only the
+ * translated expression is evaluated during runtime.
+ *
+ * <p>Note: Actually, this class belongs into the {@code flink-table-api-java} module, however,
+ * since this expression is crucial for catalogs when defining persistable computed columns and
+ * watermark strategies, we keep it in {@code flink-table-common} to keep the dependencies of
+ * catalogs low.
+ */
+@PublicEvolving
+public final class SqlCallExpression implements Expression {
+
+ // indicates that this is an unresolved expression consistent with unresolved data types
+ private static final String FORMAT = "[%s]";
+
+ private final String sqlExpression;
+
+ public SqlCallExpression(String sqlExpression) {
+ this.sqlExpression = sqlExpression;
+ }
+
+ public String getSqlExpression() {
+ return sqlExpression;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return String.format(FORMAT, sqlExpression);
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(ExpressionVisitor<R> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String toString() {
+ return asSummaryString();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 01b4825..4fdeead 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1458,13 +1458,6 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(TypeStrategies.MISSING)
.build();
- public static final BuiltInFunctionDefinition CALL_SQL =
- BuiltInFunctionDefinition.newBuilder()
- .name("CALLSQL")
- .kind(OTHER)
- .outputTypeStrategy(TypeStrategies.MISSING)
- .build();
-
public static final Set<FunctionDefinition> WINDOW_PROPERTIES =
new HashSet<>(Arrays.asList(WINDOW_START, WINDOW_END, PROCTIME, ROWTIME));
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 5ab09d9..40aa840 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -394,6 +394,9 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
override def visit(lookupCall: LookupCallExpression): PlannerExpression =
throw new TableException("Unsupported function call: " + lookupCall)
+ override def visit(sqlCall: SqlCallExpression): PlannerExpression =
+ throw new TableException("Unsupported function call: " + sqlCall)
+
override def visit(other: ResolvedExpression): PlannerExpression = visitNonApiExpression(other)
override def visitNonApiExpression(other: Expression): PlannerExpression = {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
index 8e24d70..9a90063 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
import org.junit.ClassRule;
import org.junit.Test;
@@ -183,17 +184,17 @@ public abstract class BuiltInFunctionTestBase {
*/
protected static class TestSpec {
- private final BuiltInFunctionDefinition definition;
+ private final @Nullable BuiltInFunctionDefinition definition;
private final @Nullable String description;
- private Object[] fieldData;
+ private final List<Class<? extends UserDefinedFunction>> functions;
- private @Nullable AbstractDataType<?>[] fieldDataTypes;
+ private final List<TestItem> testItems;
- private List<Class<? extends UserDefinedFunction>> functions;
+ private Object[] fieldData;
- private List<TestItem> testItems;
+ private @Nullable AbstractDataType<?>[] fieldDataTypes;
private TestSpec(BuiltInFunctionDefinition definition, @Nullable String description) {
this.definition = definition;
@@ -203,11 +204,15 @@ public abstract class BuiltInFunctionTestBase {
}
static TestSpec forFunction(BuiltInFunctionDefinition definition) {
- return new TestSpec(definition, null);
+ return forFunction(definition, null);
}
static TestSpec forFunction(BuiltInFunctionDefinition definition, String description) {
- return new TestSpec(definition, description);
+ return new TestSpec(Preconditions.checkNotNull(definition), description);
+ }
+
+ static TestSpec forExpression(String description) {
+ return new TestSpec(null, Preconditions.checkNotNull(description));
}
TestSpec onFieldsWithData(Object... fieldData) {
@@ -259,7 +264,8 @@ public abstract class BuiltInFunctionTestBase {
@Override
public String toString() {
- return definition.getName() + (description != null ? " : " + description : "");
+ return (definition != null ? definition.getName() : "Expression")
+ + (description != null ? " : " + description : "");
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
index 28d6924..6d242fc 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
@@ -81,7 +81,7 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
DataTypes.DECIMAL(12, 2).notNull())
.testSqlResult(
"TakesNotNull(IFNULL(f0, 12))", 12, DataTypes.INT().notNull()),
- TestSpec.forFunction(BuiltInFunctionDefinitions.CALL_SQL)
+ TestSpec.forExpression("SQL call")
.onFieldsWithData(null, 12, "Hello World")
.andDataTypes(
DataTypes.INT().nullable(),
@@ -95,10 +95,7 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
"ELLO WORLDhello worl",
DataTypes.STRING().notNull())
.testTableApiError(
- callSql("UPPER(f1)"), "Invalid SQL expression: UPPER(f1)")
- .testTableApiError(
- call("CALLSQL", $("f2")),
- "SQL expression must be a string literal."));
+ callSql("UPPER(f1)"), "Invalid SQL expression: UPPER(f1)"));
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 7378ef3..2760f90 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -26,8 +26,8 @@ import org.apache.flink.table.functions._
import org.apache.flink.table.types.logical.LogicalTypeRoot.SYMBOL
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{LocalDate, LocalDateTime}
import org.apache.flink.table.util.Logging
import _root_.scala.collection.JavaConverters._
@@ -846,6 +846,9 @@ class PlannerExpressionConverter private
override def visit(lookupCall: LookupCallExpression): PlannerExpression =
throw new TableException("Unsupported function call: " + lookupCall)
+ override def visit(sqlCall: SqlCallExpression): PlannerExpression =
+ throw new TableException("Unsupported function call: " + sqlCall)
+
override def visitNonApiExpression(other: Expression): PlannerExpression = {
other match {
// already converted planner expressions will pass this visitor without modification