You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/02/23 08:17:44 UTC

[flink] branch master updated: [FLINK-21435][table] Use a dedicated SQL expression in Table API

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ab5f8  [FLINK-21435][table] Use a dedicated SQL expression in Table API
c5ab5f8 is described below

commit c5ab5f89687a2046c7a54b9f6c4448fbebed8493
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Feb 22 14:15:04 2021 +0100

    [FLINK-21435][table] Use a dedicated SQL expression in Table API
    
    This closes #14986.
---
 .../org/apache/flink/table/api/Expressions.java    |  7 +-
 .../table/expressions/ApiExpressionUtils.java      |  4 ++
 .../table/expressions/ApiExpressionVisitor.java    |  4 ++
 .../expressions/resolver/ExpressionResolver.java   |  1 +
 .../expressions/resolver/LookupCallResolver.java   |  3 +-
 .../resolver/rules/ResolveCallByArgumentsRule.java | 24 +------
 .../resolver/rules/ResolveSqlCallRule.java         | 79 ++++++++++++++++++++++
 .../expressions/resolver/rules/ResolverRules.java  |  4 ++
 .../utils/ApiExpressionDefaultVisitor.java         |  6 ++
 .../flink/table/expressions/SqlCallExpression.java | 72 ++++++++++++++++++++
 .../functions/BuiltInFunctionDefinitions.java      |  7 --
 .../expressions/PlannerExpressionConverter.scala   |  3 +
 .../planner/functions/BuiltInFunctionTestBase.java | 22 +++---
 .../planner/functions/MiscFunctionsITCase.java     |  7 +-
 .../expressions/PlannerExpressionConverter.scala   |  5 +-
 15 files changed, 201 insertions(+), 47 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 1f5f6cc..4dfcde1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -540,7 +541,7 @@ public final class Expressions {
      * table-valued functions are not supported. Sub-queries are also not allowed.
      */
     public static ApiExpression callSql(String sqlExpression) {
-        return apiCall(BuiltInFunctionDefinitions.CALL_SQL, sqlExpression);
+        return apiSqlCall(sqlExpression);
     }
 
     private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
@@ -568,4 +569,8 @@ public final class Expressions {
                         .collect(Collectors.toList());
         return new ApiExpression(unresolvedCall(functionDefinition, arguments));
     }
+
+    private static ApiExpression apiSqlCall(String sqlExpression) {
+        return new ApiExpression(new SqlCallExpression(sqlExpression));
+    }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index 356bf24..2d87882 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -304,6 +304,10 @@ public final class ApiExpressionUtils {
                         .collect(Collectors.toList()));
     }
 
+    public static SqlCallExpression sqlCall(String sqlExpression) {
+        return new SqlCallExpression(sqlExpression);
+    }
+
     public static Expression toMonthInterval(Expression e, int multiplier) {
         return ExpressionUtils.extractValue(e, BigDecimal.class)
                 .map((v) -> intervalOfMonths(v.intValue() * multiplier))
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
index b0c2bc6..f0b34bb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
@@ -35,6 +35,8 @@ public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {
             return visit((LookupCallExpression) other);
         } else if (other instanceof UnresolvedCallExpression) {
             return visit((UnresolvedCallExpression) other);
+        } else if (other instanceof SqlCallExpression) {
+            return visit((SqlCallExpression) other);
         } else if (other instanceof ResolvedExpression) {
             return visit((ResolvedExpression) other);
         }
@@ -62,6 +64,8 @@ public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {
 
     public abstract R visit(UnresolvedCallExpression unresolvedCallExpression);
 
+    public abstract R visit(SqlCallExpression sqlCall);
+
     // --------------------------------------------------------------------------------------------
     // other expressions
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 6f55d3d..39d4a4a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -93,6 +93,7 @@ public class ExpressionResolver {
                 ResolverRules.OVER_WINDOWS,
                 ResolverRules.FIELD_RESOLVE,
                 ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
+                ResolverRules.RESOLVE_SQL_CALL,
                 ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
     }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 683dc57..2e41d0e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -71,8 +71,7 @@ public class LookupCallResolver extends ApiExpressionDefaultVisitor<Expression>
     @Override
     public Expression visitNonApiExpression(Expression other) {
         // LookupCallResolver might be called outside of ExpressionResolver, thus we need to
-        // additionally
-        // handle the ApiExpressions here
+        // additionally handle the ApiExpressions here
         if (other instanceof ApiExpression) {
             return ((ApiExpression) other).toExpr().accept(this);
         } else {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 61ad2fe..8bbb038 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
@@ -32,7 +31,6 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -62,7 +60,6 @@ import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
 import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
@@ -94,7 +91,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
     private static class ResolvingCallVisitor
             extends RuleExpressionVisitor<List<ResolvedExpression>> {
 
-        private @Nullable SurroundingInfo surroundingInfo;
+        private final @Nullable SurroundingInfo surroundingInfo;
 
         ResolvingCallVisitor(ResolutionContext context, @Nullable SurroundingInfo surroundingInfo) {
             super(context);
@@ -143,8 +140,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 
             if (definition == BuiltInFunctionDefinitions.FLATTEN) {
                 return executeFlatten(resolvedArgs);
-            } else if (definition == BuiltInFunctionDefinitions.CALL_SQL) {
-                return executeCallSql(resolvedArgs);
             }
 
             return Collections.singletonList(
@@ -198,23 +193,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
                     .collect(Collectors.toList());
         }
 
-        private List<ResolvedExpression> executeCallSql(List<ResolvedExpression> args) {
-            final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
-            if (args.size() != 1 || !extractValue(args.get(0), String.class).isPresent()) {
-                throw new ValidationException("SQL expression must be a string literal.");
-            }
-
-            final String sqlExpression =
-                    extractValue(args.get(0), String.class).orElseThrow(IllegalStateException::new);
-            final TableSchema.Builder builder = TableSchema.builder();
-            resolutionContext
-                    .referenceLookup()
-                    .getAllInputFields()
-                    .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
-            return Collections.singletonList(
-                    resolver.resolveExpression(sqlExpression, builder.build()));
-        }
-
         /** Temporary method until all calls define a type inference. */
         private Optional<TypeInference> getOptionalTypeInference(FunctionDefinition definition) {
             if (definition instanceof ScalarFunctionDefinition
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
new file mode 100644
index 0000000..f8bb3b3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveSqlCallRule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.resolver.rules;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.resolver.SqlExpressionResolver;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Resolves {@link SqlCallExpression}s to {@link ResolvedExpression} by delegating to the planner.
+ */
+@Internal
+final class ResolveSqlCallRule implements ResolverRule {
+
+    @Override
+    public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
+        return expression.stream()
+                .map(expr -> expr.accept(new TranslateSqlCallsVisitor(context)))
+                .collect(Collectors.toList());
+    }
+
+    private static class TranslateSqlCallsVisitor extends RuleExpressionVisitor<Expression> {
+
+        TranslateSqlCallsVisitor(ResolutionContext resolutionContext) {
+            super(resolutionContext);
+        }
+
+        @Override
+        public Expression visit(SqlCallExpression sqlCall) {
+            final SqlExpressionResolver resolver = resolutionContext.sqlExpressionResolver();
+
+            final TableSchema.Builder builder = TableSchema.builder();
+            resolutionContext
+                    .referenceLookup()
+                    .getAllInputFields()
+                    .forEach(f -> builder.field(f.getName(), f.getOutputDataType()));
+            return resolver.resolveExpression(sqlCall.getSqlExpression(), builder.build());
+        }
+
+        @Override
+        public Expression visit(UnresolvedCallExpression unresolvedCall) {
+            return unresolvedCall.replaceArgs(resolveChildren(unresolvedCall.getChildren()));
+        }
+
+        @Override
+        protected Expression defaultMethod(Expression expression) {
+            return expression;
+        }
+
+        private List<Expression> resolveChildren(List<Expression> lookupChildren) {
+            return lookupChildren.stream()
+                    .map(child -> child.accept(this))
+                    .collect(Collectors.toList());
+        }
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index a3674ee..97f05cf 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 
 /** Contains instances of {@link ResolverRule}. */
@@ -32,6 +33,9 @@ public final class ResolverRules {
      */
     public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule();
 
+    /** Resolves {@link SqlCallExpression}s. */
+    public static final ResolverRule RESOLVE_SQL_CALL = new ResolveSqlCallRule();
+
     /**
      * Resolves call based on argument types. See {@link ResolveCallByArgumentsRule} for details.
      */
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
index f103b37..ce499a9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.LookupCallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
@@ -103,6 +104,11 @@ public abstract class ApiExpressionDefaultVisitor<T> extends ApiExpressionVisito
         return defaultMethod(unresolvedCall);
     }
 
+    @Override
+    public T visit(SqlCallExpression sqlCall) {
+        return defaultMethod(sqlCall);
+    }
+
     // --------------------------------------------------------------------------------------------
     // other expressions
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java
new file mode 100644
index 0000000..ee04449
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A call to a SQL expression.
+ *
+ * <p>The given string is parsed and translated into an {@link Expression} during planning. Only the
+ * translated expression is evaluated during runtime.
+ *
+ * <p>Note: Actually, this class belongs into the {@code flink-table-api-java} module, however,
+ * since this expression is crucial for catalogs when defining persistable computed columns and
+ * watermark strategies, we keep it in {@code flink-table-common} to keep the dependencies of
+ * catalogs low.
+ */
+@PublicEvolving
+public final class SqlCallExpression implements Expression {
+
+    // indicates that this is an unresolved expression consistent with unresolved data types
+    private static final String FORMAT = "[%s]";
+
+    private final String sqlExpression;
+
+    public SqlCallExpression(String sqlExpression) {
+        this.sqlExpression = sqlExpression;
+    }
+
+    public String getSqlExpression() {
+        return sqlExpression;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format(FORMAT, sqlExpression);
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public String toString() {
+        return asSummaryString();
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 01b4825..4fdeead 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1458,13 +1458,6 @@ public final class BuiltInFunctionDefinitions {
                     .outputTypeStrategy(TypeStrategies.MISSING)
                     .build();
 
-    public static final BuiltInFunctionDefinition CALL_SQL =
-            BuiltInFunctionDefinition.newBuilder()
-                    .name("CALLSQL")
-                    .kind(OTHER)
-                    .outputTypeStrategy(TypeStrategies.MISSING)
-                    .build();
-
     public static final Set<FunctionDefinition> WINDOW_PROPERTIES =
             new HashSet<>(Arrays.asList(WINDOW_START, WINDOW_END, PROCTIME, ROWTIME));
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 5ab09d9..40aa840 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -394,6 +394,9 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
   override def visit(lookupCall: LookupCallExpression): PlannerExpression =
     throw new TableException("Unsupported function call: " + lookupCall)
 
+  override def visit(sqlCall: SqlCallExpression): PlannerExpression =
+    throw new TableException("Unsupported function call: " + sqlCall)
+
   override def visit(other: ResolvedExpression): PlannerExpression = visitNonApiExpression(other)
 
   override def visitNonApiExpression(other: Expression): PlannerExpression = {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
index 8e24d70..9a90063 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -183,17 +184,17 @@ public abstract class BuiltInFunctionTestBase {
      */
     protected static class TestSpec {
 
-        private final BuiltInFunctionDefinition definition;
+        private final @Nullable BuiltInFunctionDefinition definition;
 
         private final @Nullable String description;
 
-        private Object[] fieldData;
+        private final List<Class<? extends UserDefinedFunction>> functions;
 
-        private @Nullable AbstractDataType<?>[] fieldDataTypes;
+        private final List<TestItem> testItems;
 
-        private List<Class<? extends UserDefinedFunction>> functions;
+        private Object[] fieldData;
 
-        private List<TestItem> testItems;
+        private @Nullable AbstractDataType<?>[] fieldDataTypes;
 
         private TestSpec(BuiltInFunctionDefinition definition, @Nullable String description) {
             this.definition = definition;
@@ -203,11 +204,15 @@ public abstract class BuiltInFunctionTestBase {
         }
 
         static TestSpec forFunction(BuiltInFunctionDefinition definition) {
-            return new TestSpec(definition, null);
+            return forFunction(definition, null);
         }
 
         static TestSpec forFunction(BuiltInFunctionDefinition definition, String description) {
-            return new TestSpec(definition, description);
+            return new TestSpec(Preconditions.checkNotNull(definition), description);
+        }
+
+        static TestSpec forExpression(String description) {
+            return new TestSpec(null, Preconditions.checkNotNull(description));
         }
 
         TestSpec onFieldsWithData(Object... fieldData) {
@@ -259,7 +264,8 @@ public abstract class BuiltInFunctionTestBase {
 
         @Override
         public String toString() {
-            return definition.getName() + (description != null ? " : " + description : "");
+            return (definition != null ? definition.getName() : "Expression")
+                    + (description != null ? " : " + description : "");
         }
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
index 28d6924..6d242fc 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
@@ -81,7 +81,7 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
                                 DataTypes.DECIMAL(12, 2).notNull())
                         .testSqlResult(
                                 "TakesNotNull(IFNULL(f0, 12))", 12, DataTypes.INT().notNull()),
-                TestSpec.forFunction(BuiltInFunctionDefinitions.CALL_SQL)
+                TestSpec.forExpression("SQL call")
                         .onFieldsWithData(null, 12, "Hello World")
                         .andDataTypes(
                                 DataTypes.INT().nullable(),
@@ -95,10 +95,7 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
                                 "ELLO WORLDhello worl",
                                 DataTypes.STRING().notNull())
                         .testTableApiError(
-                                callSql("UPPER(f1)"), "Invalid SQL expression: UPPER(f1)")
-                        .testTableApiError(
-                                call("CALLSQL", $("f2")),
-                                "SQL expression must be a string literal."));
+                                callSql("UPPER(f1)"), "Invalid SQL expression: UPPER(f1)"));
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 7378ef3..2760f90 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -26,8 +26,8 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.types.logical.LogicalTypeRoot.SYMBOL
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
 import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
-import java.time.{LocalDate, LocalDateTime}
 
+import java.time.{LocalDate, LocalDateTime}
 import org.apache.flink.table.util.Logging
 
 import _root_.scala.collection.JavaConverters._
@@ -846,6 +846,9 @@ class PlannerExpressionConverter private
   override def visit(lookupCall: LookupCallExpression): PlannerExpression =
     throw new TableException("Unsupported function call: " + lookupCall)
 
+  override def visit(sqlCall: SqlCallExpression): PlannerExpression =
+    throw new TableException("Unsupported function call: " + sqlCall)
+
   override def visitNonApiExpression(other: Expression): PlannerExpression = {
     other match {
       // already converted planner expressions will pass this visitor without modification