You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 11:11:00 UTC

[flink] branch master updated (c3f2ad8 -> 036c549)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c3f2ad8  [FLINK-12809][table-api] Introduce PartitionableTableSink for supporting writing data into partitions
     new 6161043  [FLINK-13028][table-api-java] Extract legacy type inference logic
     new 272ac71  [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver
     new 8c1968a  [FLINK-13028][table-api-java] Refactor local over windows
     new 14fe641  [FLINK-13028][table] Refactor expression package structure
     new 036c549  [FLINK-13028][table-api-java] Merge flatten and call resolution rule

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/api/OverWindowPartitionedOrdered.java    |   2 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   2 +-
 .../apache/flink/table/api/internal/TableImpl.java |   2 +-
 .../PlannerExpressionParser.java                   |   8 +-
 .../table/delegation/PlannerTypeInferenceUtil.java |  75 ++++++++
 .../flink/table/expressions/ExpressionParser.java  |   1 +
 .../expressions/resolver}/ExpressionResolver.java  | 137 ++++----------
 .../expressions/resolver/LocalOverWindow.java      |  75 ++++++++
 .../{ => resolver}/LookupCallResolver.java         |   6 +-
 .../resolver}/lookups/FieldReferenceLookup.java    |   2 +-
 .../lookups/TableReferenceLookup.java              |   2 +-
 .../resolver}/rules/ExpandColumnFunctionsRule.java |   8 +-
 .../resolver}/rules/LookupCallByNameRule.java      |   4 +-
 .../resolver}/rules/OverWindowResolverRule.java    |  68 +++++--
 .../rules/QualifyBuiltInFunctionsRule.java         |   2 +-
 .../resolver}/rules/ReferenceResolverRule.java     |   7 +-
 .../rules/ResolveCallByArgumentsRule.java          | 209 +++++++++------------
 .../expressions/resolver}/rules/ResolverRule.java  |  20 +-
 .../expressions/resolver}/rules/ResolverRules.java |   7 +-
 .../resolver}/rules/RuleExpressionVisitor.java     |   6 +-
 .../rules/StarReferenceFlatteningRule.java         |   2 +-
 .../{ => utils}/ApiExpressionDefaultVisitor.java   |  13 +-
 .../{ => utils}/ApiExpressionUtils.java            |  11 +-
 .../ResolvedExpressionDefaultVisitor.java          |  10 +-
 .../operations/OperationExpressionsUtils.java      |  10 +-
 .../flink/table/typeutils/FieldInfoUtils.java      |   2 +-
 .../flink/table/operations/QueryOperationTest.java |   2 +-
 .../flink/table/expressions/ExpressionBuilder.java |   1 +
 .../functions/aggfunctions/AvgAggFunction.java     |   2 +-
 .../functions/aggfunctions/ConcatAggFunction.java  |   2 +-
 .../functions/aggfunctions/Count1AggFunction.java  |   2 +-
 .../functions/aggfunctions/CountAggFunction.java   |   2 +-
 .../aggfunctions/DeclarativeAggregateFunction.java |   2 +-
 .../functions/aggfunctions/IncrSumAggFunction.java |   2 +-
 .../IncrSumWithRetractAggFunction.java             |   2 +-
 .../functions/aggfunctions/LeadLagAggFunction.java |   2 +-
 .../functions/aggfunctions/MaxAggFunction.java     |   2 +-
 .../functions/aggfunctions/MinAggFunction.java     |   2 +-
 .../functions/aggfunctions/RankAggFunction.java    |   2 +-
 .../aggfunctions/RankLikeAggFunctionBase.java      |   2 +-
 .../aggfunctions/RowNumberAggFunction.java         |   2 +-
 .../aggfunctions/SingleValueAggFunction.java       |   2 +-
 .../functions/aggfunctions/Sum0AggFunction.java    |   2 +-
 .../functions/aggfunctions/SumAggFunction.java     |   2 +-
 .../aggfunctions/SumWithRetractAggFunction.java    |   2 +-
 .../table/codegen/agg/DeclarativeAggCodeGen.scala  |   4 +-
 .../table/codegen/agg/batch/AggCodeGenHelper.scala |   3 +-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   3 +-
 .../logical/LogicalWindowAggregateRuleBase.scala   |   2 +-
 .../flink/table/plan/util/RexNodeExtractor.scala   |   2 +-
 .../flink/table/sources/TableSourceUtil.scala      |   2 +-
 .../table/sources/tsextractors/ExistingField.scala |   2 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   2 +-
 .../table/plan/util/RexNodeExtractorTest.scala     |   2 +-
 .../apache/flink/table/util/testTableSources.scala |   2 +-
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 140 ++++++++++++++
 .../table/expressions/rules/FlattenCallRule.java   |  90 ---------
 .../operations/AggregateOperationFactory.java      |   6 +-
 .../table/operations/AliasOperationUtils.java      |   8 +-
 .../table/operations/CalculatedTableFactory.java   |   4 +-
 .../table/operations/ColumnOperationUtils.java     |   6 +-
 .../table/operations/JoinOperationFactory.java     |   2 +-
 .../operations/OperationTreeBuilderFactory.java    |   2 +-
 .../operations/ProjectionOperationFactory.java     |   4 +-
 .../table/operations/SortOperationFactory.java     |   4 +-
 .../flink/table/plan/QueryOperationConverter.java  |   4 +-
 .../table/api/internal/BatchTableEnvImpl.scala     |   3 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |   2 +-
 .../flink/table/api/scala/expressionDsl.scala      |   2 +-
 .../flink/table/expressions/ExpressionBridge.scala |   1 +
 .../expressions/PlannerExpressionParserImpl.scala  |   3 +-
 .../operations/OperationTreeBuilderImpl.scala      |   8 +-
 .../flink/table/plan/logical/groupWindows.scala    |  15 +-
 .../table/plan/util/RexProgramExtractor.scala      |   2 +-
 .../flink/table/api/batch/table/CalcTest.scala     |   1 +
 .../flink/table/expressions/KeywordParseTest.scala |   2 +-
 76 files changed, 608 insertions(+), 453 deletions(-)
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/{expressions => delegation}/PlannerExpressionParser.java (87%)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/ExpressionResolver.java (72%)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LocalOverWindow.java
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/{ => resolver}/LookupCallResolver.java (87%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/lookups/FieldReferenceLookup.java (98%)
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/{ => resolver}/lookups/TableReferenceLookup.java (95%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/ExpandColumnFunctionsRule.java (96%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/LookupCallByNameRule.java (92%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/OverWindowResolverRule.java (55%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/QualifyBuiltInFunctionsRule.java (97%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/ReferenceResolverRule.java (92%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/ResolveCallByArgumentsRule.java (59%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/ResolverRule.java (80%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/ResolverRules.java (92%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/RuleExpressionVisitor.java (84%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions => flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver}/rules/StarReferenceFlatteningRule.java (97%)
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/{ => utils}/ApiExpressionDefaultVisitor.java (80%)
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/{ => utils}/ApiExpressionUtils.java (87%)
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/{ => utils}/ResolvedExpressionDefaultVisitor.java (76%)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
 delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java


[flink] 03/05: [FLINK-13028][table-api-java] Refactor local over windows

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c1968ac2880c3784c6c35ffb819932483208807
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 09:12:41 2019 +0200

    [FLINK-13028][table-api-java] Refactor local over windows
---
 .../flink/table/expressions/LocalOverWindow.java   | 76 ++++++++++++++++++++++
 .../table/expressions/ExpressionResolver.java      | 21 +++---
 .../expressions/rules/OverWindowResolverRule.java  | 16 ++---
 .../table/expressions/rules/ResolverRule.java      |  6 +-
 .../flink/table/plan/logical/groupWindows.scala    | 15 +----
 5 files changed, 98 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
new file mode 100644
index 0000000..45e1a7a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Local over window created during expression resolution.
+ */
+@Internal
+public final class LocalOverWindow {
+
+	private Expression alias;
+
+	private List<Expression> partitionBy;
+
+	private Expression orderBy;
+
+	private Expression preceding;
+
+	private @Nullable Expression following;
+
+	LocalOverWindow(
+			Expression alias,
+			List<Expression> partitionBy,
+			Expression orderBy,
+			Expression preceding,
+			@Nullable Expression following) {
+		this.alias = alias;
+		this.partitionBy = partitionBy;
+		this.orderBy = orderBy;
+		this.preceding = preceding;
+		this.following = following;
+	}
+
+	public Expression getAlias() {
+		return alias;
+	}
+
+	public List<Expression> getPartitionBy() {
+		return partitionBy;
+	}
+
+	public Expression getOrderBy() {
+		return orderBy;
+	}
+
+	public Expression getPreceding() {
+		return preceding;
+	}
+
+	public Optional<Expression> getFollowing() {
+		return Optional.ofNullable(following);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 015751c..ea45f33 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.expressions.rules.ResolverRules;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
@@ -100,13 +99,13 @@ public class ExpressionResolver {
 
 	private final Map<String, LocalReferenceExpression> localReferences;
 
-	private final Map<Expression, LogicalOverWindow> overWindows;
+	private final Map<Expression, LocalOverWindow> localOverWindows;
 
 	private ExpressionResolver(
 			TableReferenceLookup tableLookup,
 			FunctionLookup functionLookup,
 			FieldReferenceLookup fieldLookup,
-			List<OverWindow> overWindows,
+			List<OverWindow> localOverWindows,
 			List<LocalReferenceExpression> localReferences) {
 		this.tableLookup = Preconditions.checkNotNull(tableLookup);
 		this.fieldLookup = Preconditions.checkNotNull(fieldLookup);
@@ -116,7 +115,7 @@ public class ExpressionResolver {
 			LocalReferenceExpression::getName,
 			Function.identity()
 		));
-		this.overWindows = prepareOverWindows(overWindows);
+		this.localOverWindows = prepareOverWindows(localOverWindows);
 	}
 
 	/**
@@ -187,11 +186,11 @@ public class ExpressionResolver {
 			);
 	}
 
-	private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
+	private Map<Expression, LocalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
 		return overWindows.stream()
 			.map(this::resolveOverWindow)
 			.collect(Collectors.toMap(
-				LogicalOverWindow::alias,
+				LocalOverWindow::getAlias,
 				Function.identity()
 			));
 	}
@@ -262,18 +261,18 @@ public class ExpressionResolver {
 		}
 
 		@Override
-		public Optional<LogicalOverWindow> getOverWindow(Expression alias) {
-			return Optional.ofNullable(overWindows.get(alias));
+		public Optional<LocalOverWindow> getOverWindow(Expression alias) {
+			return Optional.ofNullable(localOverWindows.get(alias));
 		}
 	}
 
-	private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
-		return new LogicalOverWindow(
+	private LocalOverWindow resolveOverWindow(OverWindow overWindow) {
+		return new LocalOverWindow(
 			overWindow.getAlias(),
 			prepareExpressions(overWindow.getPartitioning()),
 			resolveFieldsInSingleExpression(overWindow.getOrder()),
 			resolveFieldsInSingleExpression(overWindow.getPreceding()),
-			overWindow.getFollowing().map(this::resolveFieldsInSingleExpression)
+			overWindow.getFollowing().map(this::resolveFieldsInSingleExpression).orElse(null)
 		);
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index fcec1ac..3420437 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
@@ -68,17 +68,17 @@ final class OverWindowResolverRule implements ResolverRule {
 				List<Expression> children = unresolvedCall.getChildren();
 				Expression alias = children.get(1);
 
-				LogicalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
+				LocalOverWindow referenceWindow = resolutionContext.getOverWindow(alias)
 					.orElseThrow(() -> new ValidationException("Could not resolve over call."));
 
 				Expression following = calculateOverWindowFollowing(referenceWindow);
 				List<Expression> newArgs = new ArrayList<>(asList(
 					children.get(0),
-					referenceWindow.orderBy(),
-					referenceWindow.preceding(),
+					referenceWindow.getOrderBy(),
+					referenceWindow.getPreceding(),
 					following));
 
-				newArgs.addAll(referenceWindow.partitionBy());
+				newArgs.addAll(referenceWindow.getPartitionBy());
 
 				return unresolvedCall(unresolvedCall.getFunctionDefinition(), newArgs.toArray(new Expression[0]));
 			} else {
@@ -90,9 +90,9 @@ final class OverWindowResolverRule implements ResolverRule {
 			}
 		}
 
-		private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
-			return referenceWindow.following().orElseGet(() -> {
-					WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+		private Expression calculateOverWindowFollowing(LocalOverWindow referenceWindow) {
+			return referenceWindow.getFollowing().orElseGet(() -> {
+					WindowKind kind = referenceWindow.getPreceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
 					if (kind == WindowKind.ROW) {
 						return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
 					} else {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index af51499..802b550 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionResolver;
+import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.plan.logical.LogicalOverWindow;
 
 import java.util.List;
 import java.util.Optional;
@@ -73,8 +73,8 @@ public interface ResolverRule {
 		Optional<LocalReferenceExpression> getLocalReference(String alias);
 
 		/**
-		 * Lookup for over windows.
+		 * Access to available local over windows.
 		 */
-		Optional<LogicalOverWindow> getOverWindow(Expression alias);
+		Optional<LocalOverWindow> getOverWindow(Expression alias);
 	}
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 50651e9..02b735e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -18,20 +18,7 @@
 
 package org.apache.flink.table.plan.logical
 
-import java.util.{Optional, List => JList}
-
-import org.apache.flink.table.expressions.{Expression, PlannerExpression}
-
-// ------------------------------------------------------------------------------------------------
-// Over windows
-// ------------------------------------------------------------------------------------------------
-
-case class LogicalOverWindow(
-    alias: Expression,
-    partitionBy: JList[Expression],
-    orderBy: Expression,
-    preceding: Expression,
-    following: Optional[Expression])
+import org.apache.flink.table.expressions.PlannerExpression
 
 // ------------------------------------------------------------------------------------------------
 // Group windows


[flink] 01/05: [FLINK-13028][table-api-java] Extract legacy type inference logic

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6161043a2afbd458f2587f3f63df60f620c755b7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Jun 28 13:34:29 2019 +0200

    [FLINK-13028][table-api-java] Extract legacy type inference logic
---
 .../table/delegation/PlannerTypeInferenceUtil.java |  75 +++++++++++
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 140 +++++++++++++++++++++
 .../rules/ResolveCallByArgumentsRule.java          | 127 ++++---------------
 3 files changed, 237 insertions(+), 105 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
new file mode 100644
index 0000000..4e003ff
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+/**
+ * Temporary utility for validation and output type inference until all {@code PlannerExpression} are
+ * upgraded to work with {@link TypeInferenceUtil}.
+ */
+@Internal
+public interface PlannerTypeInferenceUtil {
+
+	static PlannerTypeInferenceUtil create() {
+		return SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil();
+	}
+
+	/**
+	 * Same behavior as {@link TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}.
+	 */
+	TypeInferenceUtil.Result runTypeInference(
+		UnresolvedCallExpression unresolvedCall,
+		List<ResolvedExpression> resolvedArgs);
+
+	/**
+	 * A singleton pattern utility for avoiding creating many {@link PlannerTypeInferenceUtil}.
+	 */
+	class SingletonPlannerTypeInferenceUtil {
+
+		private static PlannerTypeInferenceUtil plannerTypeInferenceUtil;
+
+		public static PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
+			if (plannerTypeInferenceUtil == null) {
+				try {
+					final Class<?> clazz =
+						Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl");
+					final Constructor<?> con = clazz.getConstructor();
+					plannerTypeInferenceUtil = (PlannerTypeInferenceUtil) con.newInstance();
+				} catch (Throwable t) {
+					throw new TableException("Instantiation of PlannerTypeInferenceUtil failed.", t);
+				}
+			}
+			return plannerTypeInferenceUtil;
+		}
+
+		private SingletonPlannerTypeInferenceUtil() {
+			// no instantiation
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
new file mode 100644
index 0000000..30c3421
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.typeutils.TypeCoercion;
+import org.apache.flink.table.validate.ValidationFailure;
+import org.apache.flink.table.validate.ValidationResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
+
+/**
+ * Implementation of {@link PlannerTypeInferenceUtil}.
+ */
+@Internal
+public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
+
+	private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE();
+
+	@Override
+	public TypeInferenceUtil.Result runTypeInference(
+			UnresolvedCallExpression unresolvedCall,
+			List<ResolvedExpression> resolvedArgs) {
+		final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER);
+
+		if (plannerCall instanceof InputTypeSpec) {
+			return resolveWithCastedAssignment(
+				unresolvedCall,
+				resolvedArgs,
+				toJava(((InputTypeSpec) plannerCall).expectedTypes()),
+				plannerCall.resultType());
+		} else {
+			validateArguments(plannerCall);
+
+			final List<DataType> expectedArgumentTypes = resolvedArgs.stream()
+				.map(ResolvedExpression::getOutputDataType)
+				.collect(Collectors.toList());
+
+			return new TypeInferenceUtil.Result(
+				expectedArgumentTypes,
+				null,
+				fromLegacyInfoToDataType(plannerCall.resultType()));
+		}
+	}
+
+	private TypeInferenceUtil.Result resolveWithCastedAssignment(
+			UnresolvedCallExpression unresolvedCall,
+			List<ResolvedExpression> args,
+			List<TypeInformation<?>> expectedTypes,
+			TypeInformation<?> resultType) {
+
+		final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren()
+			.stream()
+			.map(e -> e.accept(CONVERTER))
+			.collect(Collectors.toList());
+
+		final List<DataType> castedArgs = IntStream.range(0, plannerArgs.size())
+			.mapToObj(idx -> castIfNeeded(
+				args.get(idx),
+				plannerArgs.get(idx),
+				expectedTypes.get(idx)))
+			.collect(Collectors.toList());
+
+		return new TypeInferenceUtil.Result(
+			castedArgs,
+			null,
+			fromLegacyInfoToDataType(resultType));
+	}
+
+	private void validateArguments(PlannerExpression plannerCall) {
+		if (!plannerCall.valid()) {
+			throw new ValidationException(
+				getValidationErrorMessage(plannerCall)
+					.orElse("Unexpected behavior, validation failed but can't get error messages!"));
+		}
+	}
+
+	/**
+	 * Return the validation error message of this {@link PlannerExpression} or return the
+	 * validation error message of it's children if it passes the validation. Return empty if
+	 * all validation succeeded.
+	 */
+	private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) {
+		ValidationResult validationResult = plannerCall.validateInput();
+		if (validationResult instanceof ValidationFailure) {
+			return Optional.of(((ValidationFailure) validationResult).message());
+		} else {
+			for (Expression plannerExpression: plannerCall.getChildren()) {
+				Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression);
+				if (errorMessage.isPresent()) {
+					return errorMessage;
+				}
+			}
+		}
+		return Optional.empty();
+	}
+
+	private DataType castIfNeeded(
+			ResolvedExpression child,
+			PlannerExpression plannerChild,
+			TypeInformation<?> expectedType) {
+		TypeInformation<?> actualType = plannerChild.resultType();
+		if (actualType.equals(expectedType)) {
+			return child.getOutputDataType();
+		} else if (TypeCoercion.canSafelyCast(actualType, expectedType)) {
+			return fromLegacyInfoToDataType(expectedType);
+		} else {
+			throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s",
+				child,
+				expectedType));
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index d308891..d2ad89c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -19,13 +19,11 @@
 package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.InputTypeSpec;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -37,9 +35,6 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
-import org.apache.flink.table.typeutils.TypeCoercion;
-import org.apache.flink.table.validate.ValidationFailure;
-import org.apache.flink.table.validate.ValidationResult;
 import org.apache.flink.util.Preconditions;
 
 import java.util.List;
@@ -47,12 +42,11 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
-
 /**
- * It checks if a {@link UnresolvedCallExpression} can work with given arguments.
- * If the call expects different types of arguments, but the given arguments
+ * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers
+ * the output data type. All function calls are resolved {@link CallExpression} after applying this rule.
+ *
+ * <p>If the call expects different types of arguments, but the given arguments
  * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST}
  * expression is inserted.
  */
@@ -88,6 +82,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 			if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) {
 				final BuiltInFunctionDefinition definition =
 					(BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition();
+
 				if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) {
 					return runTypeInference(
 						unresolvedCall,
@@ -116,6 +111,21 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 			return unresolvedCall.resolve(adaptedArguments, inferenceResult.getOutputDataType());
 		}
 
+		private ResolvedExpression runLegacyTypeInference(
+				UnresolvedCallExpression unresolvedCall,
+				List<ResolvedExpression> resolvedArgs) {
+
+			final PlannerTypeInferenceUtil util = PlannerTypeInferenceUtil.create();
+
+			final TypeInferenceUtil.Result inferenceResult = util.runTypeInference(
+				unresolvedCall,
+				resolvedArgs);
+
+			final List<ResolvedExpression> adaptedArguments = adaptArguments(inferenceResult, resolvedArgs);
+
+			return unresolvedCall.resolve(adaptedArguments, inferenceResult.getOutputDataType());
+		}
+
 		/**
 		 * Adapts the arguments according to the properties of the {@link TypeInferenceUtil.Result}.
 		 */
@@ -142,99 +152,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 		protected Expression defaultMethod(Expression expression) {
 			return expression;
 		}
-
-		// ----------------------------------------------------------------------------------------
-		// legacy code
-		// ----------------------------------------------------------------------------------------
-
-		private ResolvedExpression runLegacyTypeInference(
-				UnresolvedCallExpression unresolvedCall,
-				List<ResolvedExpression> resolvedArgs) {
-			final PlannerExpression plannerCall = resolutionContext.bridge(unresolvedCall);
-
-			if (plannerCall instanceof InputTypeSpec) {
-				return resolveWithCastedAssignment(
-					unresolvedCall,
-					resolvedArgs,
-					toJava(((InputTypeSpec) plannerCall).expectedTypes()),
-					plannerCall.resultType());
-			} else {
-				validateArguments(plannerCall);
-
-				return unresolvedCall.resolve(
-					resolvedArgs,
-					fromLegacyInfoToDataType(plannerCall.resultType()));
-			}
-		}
-
-		private ResolvedExpression resolveWithCastedAssignment(
-				UnresolvedCallExpression unresolvedCall,
-				List<ResolvedExpression> args,
-				List<TypeInformation<?>> expectedTypes,
-				TypeInformation<?> resultType) {
-
-			final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren()
-				.stream()
-				.map(resolutionContext::bridge)
-				.collect(Collectors.toList());
-
-			final List<ResolvedExpression> castedArgs = IntStream.range(0, plannerArgs.size())
-				.mapToObj(idx -> castIfNeeded(
-					args.get(idx),
-					plannerArgs.get(idx),
-					expectedTypes.get(idx)))
-				.collect(Collectors.toList());
-
-			return unresolvedCall.resolve(
-				castedArgs,
-				fromLegacyInfoToDataType(resultType));
-		}
-
-		private void validateArguments(PlannerExpression plannerCall) {
-			if (!plannerCall.valid()) {
-				throw new ValidationException(
-					getValidationErrorMessage(plannerCall)
-						.orElse("Unexpected behavior, validation failed but can't get error messages!"));
-			}
-		}
-
-		/**
-		 * Return the validation error message of this {@link PlannerExpression} or return the
-		 * validation error message of it's children if it passes the validation. Return empty if
-		 * all validation succeeded.
-		 */
-		private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) {
-			ValidationResult validationResult = plannerCall.validateInput();
-			if (validationResult instanceof ValidationFailure) {
-				return Optional.of(((ValidationFailure) validationResult).message());
-			} else {
-				for (Expression plannerExpression: plannerCall.getChildren()) {
-					Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression);
-					if (errorMessage.isPresent()) {
-						return errorMessage;
-					}
-				}
-			}
-			return Optional.empty();
-		}
-
-		private ResolvedExpression castIfNeeded(
-				ResolvedExpression child,
-				PlannerExpression plannerChild,
-				TypeInformation<?> expectedType) {
-			TypeInformation<?> actualType = plannerChild.resultType();
-			if (actualType.equals(expectedType)) {
-				return child;
-			} else if (TypeCoercion.canSafelyCast(actualType, expectedType)) {
-				return resolutionContext
-					.postResolutionFactory()
-					.cast(child, fromLegacyInfoToDataType(expectedType));
-			} else {
-				throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s",
-					child,
-					expectedType));
-			}
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------


[flink] 05/05: [FLINK-13028][table-api-java] Merge flatten and call resolution rule

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 036c5492a9bfe7636d5e7d5eb664af5e77952707
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 15:25:04 2019 +0200

    [FLINK-13028][table-api-java] Merge flatten and call resolution rule
---
 .../expressions/resolver/ExpressionResolver.java   |   3 +-
 .../resolver/rules/ResolveCallByArgumentsRule.java |  94 ++++++++++++-------
 .../resolver/rules/ResolveFlattenCallRule.java     | 101 ---------------------
 .../expressions/resolver/rules/ResolverRules.java  |   5 -
 .../flink/table/api/batch/table/CalcTest.scala     |   1 +
 .../table/validation/CalcValidationTest.scala      |  12 ---
 6 files changed, 62 insertions(+), 154 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 4c1e62d..e4c07fa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -90,8 +90,7 @@ public class ExpressionResolver {
 			ResolverRules.OVER_WINDOWS,
 			ResolverRules.FIELD_RESOLVE,
 			ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
-			ResolverRules.FLATTEN_CALL);
+			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
 	}
 
 	private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 73429f4..1f184c8 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -38,21 +40,26 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers
  * the output data type. All function calls are resolved {@link CallExpression} after applying this
- * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * <p>This rule also resolves {@code flatten()} calls on composite types.
  *
  * <p>If the call expects different types of arguments, but the given arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -60,39 +67,27 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 	@Override
 	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
 		return expression.stream()
-			.map(expr -> expr.accept(new CallArgumentsCastingVisitor(context)))
+			.flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream())
 			.collect(Collectors.toList());
 	}
 
-	private class CallArgumentsCastingVisitor extends RuleExpressionVisitor<Expression> {
+	// --------------------------------------------------------------------------------------------
+
+	private class ResolvingCallVisitor extends RuleExpressionVisitor<List<ResolvedExpression>> {
 
-		CallArgumentsCastingVisitor(ResolutionContext context) {
+		ResolvingCallVisitor(ResolutionContext context) {
 			super(context);
 		}
 
 		@Override
-		public Expression visit(UnresolvedCallExpression unresolvedCall) {
+		public List<ResolvedExpression> visit(UnresolvedCallExpression unresolvedCall) {
 
 			final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream()
-				.map(c -> c.accept(this))
-				.map(e -> {
-					// special case: FLATTEN
-					// a call chain `myFunc().flatten().flatten()` is not allowed
-					if (e instanceof UnresolvedCallExpression &&
-							((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-						throw new ValidationException("Consecutive flattening calls are not allowed.");
-					}
-					if (e instanceof ResolvedExpression) {
-						return (ResolvedExpression) e;
-					}
-					throw new TableException("Unexpected unresolved expression: " + e);
-				})
+				.flatMap(c -> c.accept(this).stream())
 				.collect(Collectors.toList());
 
-			// FLATTEN is a special case and the only call that remains unresolved after this rule
-			// it will be resolved by ResolveFlattenCallRule
 			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-				return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs));
+				return executeFlatten(resolvedArgs);
 			}
 
 			if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) {
@@ -100,13 +95,49 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 					(BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition();
 
 				if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) {
-					return runTypeInference(
-						unresolvedCall,
-						definition.getTypeInference(),
-						resolvedArgs);
+					return Collections.singletonList(
+						runTypeInference(
+							unresolvedCall,
+							definition.getTypeInference(),
+							resolvedArgs));
 				}
 			}
-			return runLegacyTypeInference(unresolvedCall, resolvedArgs);
+			return Collections.singletonList(
+				runLegacyTypeInference(unresolvedCall, resolvedArgs));
+		}
+
+		@Override
+		protected List<ResolvedExpression> defaultMethod(Expression expression) {
+			if (expression instanceof ResolvedExpression) {
+				return Collections.singletonList((ResolvedExpression) expression);
+			}
+			throw new TableException("Unexpected unresolved expression: " + expression);
+		}
+
+		private List<ResolvedExpression> executeFlatten(List<ResolvedExpression> args) {
+			if (args.size() != 1) {
+				throw new ValidationException("Invalid number of arguments for flattening.");
+			}
+			final ResolvedExpression composite = args.get(0);
+			// TODO support the new type system with ROW and STRUCTURED_TYPE
+			final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(composite.getOutputDataType());
+			if (resultType instanceof CompositeType) {
+				return flattenCompositeType(composite, (CompositeType<?>) resultType);
+			} else {
+				return singletonList(composite);
+			}
+		}
+
+		private List<ResolvedExpression> flattenCompositeType(ResolvedExpression composite, CompositeType<?> resultType) {
+			return IntStream.range(0, resultType.getArity())
+				.mapToObj(idx ->
+					resolutionContext.postResolutionFactory()
+						.get(
+							composite,
+							valueLiteral(resultType.getFieldNames()[idx]),
+							fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+				)
+				.collect(Collectors.toList());
 		}
 
 		private ResolvedExpression runTypeInference(
@@ -163,11 +194,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 				})
 				.collect(Collectors.toList());
 		}
-
-		@Override
-		protected Expression defaultMethod(Expression expression) {
-			return expression;
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
deleted file mode 100644
index 53d7750..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.resolver.rules;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.UnresolvedCallExpression;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-
-/**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET}
- * for all fields of underlying field of complex type.
- *
- * @see ResolveCallByArgumentsRule
- */
-@Internal
-final class ResolveFlattenCallRule implements ResolverRule {
-
-	@Override
-	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
-		return expression.stream()
-			.flatMap(expr -> expr.accept(new FlatteningCallVisitor(context)).stream())
-			.collect(Collectors.toList());
-	}
-
-	private class FlatteningCallVisitor extends RuleExpressionVisitor<List<Expression>> {
-
-		FlatteningCallVisitor(ResolutionContext context) {
-			super(context);
-		}
-
-		@Override
-		public List<Expression> visit(UnresolvedCallExpression unresolvedCall) {
-			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
-				return executeFlatten(unresolvedCall);
-			}
-
-			return singletonList(unresolvedCall);
-		}
-
-		private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) {
-			final Expression composite = unresolvedCall.getChildren().get(0);
-			if (!(composite instanceof ResolvedExpression)) {
-				throw new TableException("Resolved expression expected for flattening.");
-			}
-			final ResolvedExpression resolvedComposite = (ResolvedExpression) composite;
-			final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
-			if (resultType instanceof CompositeType) {
-				return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType);
-			} else {
-				return singletonList(composite);
-			}
-		}
-
-		private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) {
-			return IntStream.range(0, resultType.getArity())
-				.mapToObj(idx ->
-					resolutionContext.postResolutionFactory()
-						.get(
-							resolvedComposite,
-							valueLiteral(resultType.getFieldNames()[idx]),
-							fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
-				)
-				.collect(Collectors.toList());
-		}
-
-		@Override
-		protected List<Expression> defaultMethod(Expression expression) {
-			return singletonList(expression);
-		}
-	}
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 915f6a5..671b1e1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -28,11 +28,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 public final class ResolverRules {
 
 	/**
-	 * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details.
-	 */
-	public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule();
-
-	/**
 	 * Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details.
 	 */
 	public static final ResolverRule FIELD_RESOLVE = new ReferenceResolverRule();
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
index 7f8bc81..b688d4a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CalcTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.batch.table.CalcTest.{MyHashCode, TestCaseClass, WC, giveMeCaseClass}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index c4216e1..b9e9f21 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,16 +161,4 @@ class CalcValidationTest extends TableTestBase {
     util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
       .select('int, 'long.log as 'long, 'string)
   }
-
-  @Test
-  def testConsecutiveFlattening(): Unit = {
-    expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage("Consecutive flattening calls are not allowed.")
-
-    val util = streamTestUtil()
-    util.addTable[(Long, Int)](
-      "MyTable",
-      'tuple)
-    .select('tuple.flatten().flatten())
-  }
 }


[flink] 02/05: [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 272ac71328aac0583df859bf197e30afcec0981f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 08:25:05 2019 +0200

    [FLINK-13028][table-api-java] Remove planner expression from ExpressionResolver
---
 .../table/expressions/ExpressionResolver.java      | 98 +++-------------------
 .../expressions/rules/OverWindowResolverRule.java  | 50 ++++++++++-
 .../rules/ResolveCallByArgumentsRule.java          | 26 ++++--
 ...enCallRule.java => ResolveFlattenCallRule.java} | 39 +++++----
 .../table/expressions/rules/ResolverRule.java      |  6 --
 .../table/expressions/rules/ResolverRules.java     |  4 +-
 .../table/validation/CalcValidationTest.scala      | 12 +++
 7 files changed, 119 insertions(+), 116 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
index 394180b..015751c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
@@ -19,14 +19,9 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
-import org.apache.flink.table.api.SessionWithGapOnTimeWithAlias;
-import org.apache.flink.table.api.SlideWithSizeAndSlideOnTimeWithAlias;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
@@ -36,15 +31,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
-import org.apache.flink.table.plan.logical.LogicalWindow;
-import org.apache.flink.table.plan.logical.SessionGroupWindow;
-import org.apache.flink.table.plan.logical.SlidingGroupWindow;
-import org.apache.flink.table.plan.logical.TumblingGroupWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,11 +43,8 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import scala.Some;
-
 import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression}
@@ -97,15 +83,13 @@ public class ExpressionResolver {
 			ResolverRules.EXPAND_COLUMN_FUNCTIONS,
 			ResolverRules.OVER_WINDOWS,
 			ResolverRules.FIELD_RESOLVE,
-			ResolverRules.FLATTEN_CALL,
 			ResolverRules.QUALIFY_BUILT_IN_FUNCTIONS,
-			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS);
+			ResolverRules.RESOLVE_CALL_BY_ARGUMENTS,
+			ResolverRules.FLATTEN_CALL);
 	}
 
 	private static final VerifyResolutionVisitor VERIFY_RESOLUTION_VISITOR = new VerifyResolutionVisitor();
 
-	private final PlannerExpressionConverter bridgeConverter = PlannerExpressionConverter.INSTANCE();
-
 	private final FieldReferenceLookup fieldLookup;
 
 	private final TableReferenceLookup tableLookup;
@@ -187,54 +171,6 @@ public class ExpressionResolver {
 	}
 
 	/**
-	 * Converts an API class to a logical window for planning with expressions already resolved.
-	 *
-	 * @param window window to resolve
-	 * @return logical window with expressions resolved
-	 */
-	public LogicalWindow resolveGroupWindow(GroupWindow window) {
-		Expression alias = window.getAlias();
-
-		if (!(alias instanceof UnresolvedReferenceExpression)) {
-			throw new ValidationException("Alias of group window should be an UnresolvedFieldReference");
-		}
-
-		final String windowName = ((UnresolvedReferenceExpression) alias).getName();
-		List<Expression> resolvedTimeFieldExpression =
-			prepareExpressions(Collections.singletonList(window.getTimeField()));
-		if (resolvedTimeFieldExpression.size() != 1) {
-			throw new ValidationException("Group Window only supports a single time field column.");
-		}
-		PlannerExpression timeField = resolvedTimeFieldExpression.get(0).accept(bridgeConverter);
-
-		//TODO replace with LocalReferenceExpression
-		WindowReference resolvedAlias = new WindowReference(windowName, new Some<>(timeField.resultType()));
-
-		if (window instanceof TumbleWithSizeOnTimeWithAlias) {
-			TumbleWithSizeOnTimeWithAlias tw = (TumbleWithSizeOnTimeWithAlias) window;
-			return new TumblingGroupWindow(
-				resolvedAlias,
-				timeField,
-				resolveFieldsInSingleExpression(tw.getSize()).accept(bridgeConverter));
-		} else if (window instanceof SlideWithSizeAndSlideOnTimeWithAlias) {
-			SlideWithSizeAndSlideOnTimeWithAlias sw = (SlideWithSizeAndSlideOnTimeWithAlias) window;
-			return new SlidingGroupWindow(
-				resolvedAlias,
-				timeField,
-				resolveFieldsInSingleExpression(sw.getSize()).accept(bridgeConverter),
-				resolveFieldsInSingleExpression(sw.getSlide()).accept(bridgeConverter));
-		} else if (window instanceof SessionWithGapOnTimeWithAlias) {
-			SessionWithGapOnTimeWithAlias sw = (SessionWithGapOnTimeWithAlias) window;
-			return new SessionGroupWindow(
-				resolvedAlias,
-				timeField,
-				resolveFieldsInSingleExpression(sw.getGap()).accept(bridgeConverter));
-		} else {
-			throw new TableException("Unknown window type");
-		}
-	}
-
-	/**
 	 * Enables the creation of resolved expressions for transformations after the actual resolution.
 	 */
 	public PostResolverFactory postResolverFactory() {
@@ -251,20 +187,6 @@ public class ExpressionResolver {
 			);
 	}
 
-	private void prepareLocalReferencesFromGroupWindows(@Nullable GroupWindow groupWindow) {
-		if (groupWindow != null) {
-			String windowName = ((UnresolvedReferenceExpression) groupWindow.getAlias()).getName();
-			TypeInformation<?> windowType =
-				prepareExpressions(Collections.singletonList(groupWindow.getTimeField())).get(0)
-					.accept(bridgeConverter)
-					.resultType();
-
-			localReferences.put(
-				windowName,
-				new LocalReferenceExpression(windowName, fromLegacyInfoToDataType(windowType)));
-		}
-	}
-
 	private Map<Expression, LogicalOverWindow> prepareOverWindows(List<OverWindow> overWindows) {
 		return overWindows.stream()
 			.map(this::resolveOverWindow)
@@ -343,11 +265,6 @@ public class ExpressionResolver {
 		public Optional<LogicalOverWindow> getOverWindow(Expression alias) {
 			return Optional.ofNullable(overWindows.get(alias));
 		}
-
-		@Override
-		public PlannerExpression bridge(Expression expression) {
-			return expression.accept(bridgeConverter);
-		}
 	}
 
 	private LogicalOverWindow resolveOverWindow(OverWindow overWindow) {
@@ -401,6 +318,17 @@ public class ExpressionResolver {
 				Collections.singletonList(expression),
 				expression.getOutputDataType()); // the output type is equal to the input type
 		}
+
+		public CallExpression get(ResolvedExpression composite, ValueLiteralExpression key, DataType dataType) {
+			final FunctionLookup.Result lookupOfGet = functionLookup
+				.lookupBuiltInFunction(BuiltInFunctionDefinitions.GET);
+
+			return new CallExpression(
+				lookupOfGet.getObjectIdentifier(),
+				lookupOfGet.getFunctionDefinition(),
+				Arrays.asList(composite, key),
+				dataType);
+		}
 	}
 
 	/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
index 92fc177..fcec1ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
@@ -19,13 +19,15 @@
 package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.plan.logical.LogicalOverWindow;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,6 +35,9 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
 
 /**
  * Joins call to {@link BuiltInFunctionDefinitions#OVER} with corresponding over window
@@ -41,6 +46,8 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCa
 @Internal
 final class OverWindowResolverRule implements ResolverRule {
 
+	private static final WindowKindExtractor OVER_WINDOW_KIND_EXTRACTOR = new WindowKindExtractor();
+
 	@Override
 	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
 		return expression.stream()
@@ -85,8 +92,8 @@ final class OverWindowResolverRule implements ResolverRule {
 
 		private Expression calculateOverWindowFollowing(LogicalOverWindow referenceWindow) {
 			return referenceWindow.following().orElseGet(() -> {
-					PlannerExpression preceding = resolutionContext.bridge(referenceWindow.preceding());
-					if (preceding.resultType() == BasicTypeInfo.LONG_TYPE_INFO) {
+					WindowKind kind = referenceWindow.preceding().accept(OVER_WINDOW_KIND_EXTRACTOR);
+					if (kind == WindowKind.ROW) {
 						return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_ROW);
 					} else {
 						return unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
@@ -100,4 +107,39 @@ final class OverWindowResolverRule implements ResolverRule {
 			return expression;
 		}
 	}
+
+	private enum WindowKind {
+		ROW,
+		RANGE
+	}
+
+	private static class WindowKindExtractor extends ApiExpressionDefaultVisitor<WindowKind> {
+
+		@Override
+		public WindowKind visit(ValueLiteralExpression valueLiteral) {
+			final LogicalType literalType = valueLiteral.getOutputDataType().getLogicalType();
+			if (hasRoot(literalType, BIGINT)) {
+				return WindowKind.ROW;
+			} else if (hasRoot(literalType, INTERVAL_DAY_TIME)) {
+				return WindowKind.RANGE;
+			}
+			return defaultMethod(valueLiteral);
+		}
+
+		@Override
+		public WindowKind visit(UnresolvedCallExpression unresolvedCall) {
+			final FunctionDefinition definition = unresolvedCall.getFunctionDefinition();
+			if (definition == BuiltInFunctionDefinitions.UNBOUNDED_ROW) {
+				return WindowKind.ROW;
+			} else if (definition == BuiltInFunctionDefinitions.UNBOUNDED_RANGE) {
+				return WindowKind.RANGE;
+			}
+			return defaultMethod(unresolvedCall);
+		}
+
+		@Override
+		protected WindowKind defaultMethod(Expression expression) {
+			throw new ValidationException("An over window expects literal or unbounded bounds for preceding.");
+		}
+	}
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index d2ad89c..23aa7d4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.expressions.CallExpression;
@@ -37,6 +38,7 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -44,11 +46,13 @@ import java.util.stream.IntStream;
 
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers
- * the output data type. All function calls are resolved {@link CallExpression} after applying this rule.
+ * the output data type. All function calls are resolved {@link CallExpression} after applying this
+ * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}.
  *
- * <p>If the call expects different types of arguments, but the given arguments
- * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST}
- * expression is inserted.
+ * <p>If the call expects different types of arguments, but the given arguments have types that can
+ * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
+ *
+ * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
@@ -67,11 +71,17 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 		}
 
 		@Override
-		public ResolvedExpression visit(UnresolvedCallExpression unresolvedCall) {
+		public Expression visit(UnresolvedCallExpression unresolvedCall) {
 
 			final List<ResolvedExpression> resolvedArgs = unresolvedCall.getChildren().stream()
 				.map(c -> c.accept(this))
 				.map(e -> {
+					// special case: FLATTEN
+					// a call chain `myFunc().flatten().flatten()` is not allowed
+					if (e instanceof UnresolvedCallExpression &&
+							((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
+						throw new ValidationException("Consecutive flattening calls are not allowed.");
+					}
 					if (e instanceof ResolvedExpression) {
 						return (ResolvedExpression) e;
 					}
@@ -79,6 +89,12 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 				})
 				.collect(Collectors.toList());
 
+			// FLATTEN is a special case and the only call that remains unresolved after this rule
+			// it will be resolved by ResolveFlattenCallRule
+			if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) {
+				return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs));
+			}
+
 			if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) {
 				final BuiltInFunctionDefinition definition =
 					(BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
similarity index 63%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
index 7a07b43..1d2a394 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/FlattenCallRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
@@ -21,8 +21,9 @@ package org.apache.flink.table.expressions.rules;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
@@ -31,16 +32,18 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
-import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.GET;
+import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
- * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with calls to {@link BuiltInFunctionDefinitions#GET} for all
- * fields of underlying field of complex type.
+ * Replaces {@link BuiltInFunctionDefinitions#FLATTEN} with resolved calls to {@link BuiltInFunctionDefinitions#GET}
+ * for all fields of underlying field of complex type.
+ *
+ * @see ResolveCallByArgumentsRule
  */
 @Internal
-final class FlattenCallRule implements ResolverRule {
+final class ResolveFlattenCallRule implements ResolverRule {
 
 	@Override
 	public List<Expression> apply(List<Expression> expression, ResolutionContext context) {
@@ -65,20 +68,28 @@ final class FlattenCallRule implements ResolverRule {
 		}
 
 		private List<Expression> executeFlatten(UnresolvedCallExpression unresolvedCall) {
-			Expression arg = unresolvedCall.getChildren().get(0);
-			PlannerExpression plannerExpression = resolutionContext.bridge(arg);
-			plannerExpression.validateInput();
-			TypeInformation<?> resultType = plannerExpression.resultType();
+			final Expression composite = unresolvedCall.getChildren().get(0);
+			if (!(composite instanceof ResolvedExpression)) {
+				throw new TableException("Resolved expression expected for flattening.");
+			}
+			final ResolvedExpression resolvedComposite = (ResolvedExpression) composite;
+			final TypeInformation<?> resultType = fromDataTypeToLegacyInfo(resolvedComposite.getOutputDataType());
 			if (resultType instanceof CompositeType) {
-				return flattenCompositeType(arg, (CompositeType<?>) resultType);
+				return flattenCompositeType(resolvedComposite, (CompositeType<?>) resultType);
 			} else {
-				return singletonList(arg);
+				return singletonList(composite);
 			}
 		}
 
-		private List<Expression> flattenCompositeType(Expression arg, CompositeType<?> resultType) {
+		private List<Expression> flattenCompositeType(ResolvedExpression resolvedComposite, CompositeType<?> resultType) {
 			return IntStream.range(0, resultType.getArity())
-				.mapToObj(idx -> unresolvedCall(GET, arg, valueLiteral(resultType.getFieldNames()[idx])))
+				.mapToObj(idx ->
+					resolutionContext.postResolutionFactory()
+						.get(
+							resolvedComposite,
+							valueLiteral(resultType.getFieldNames()[idx]),
+							fromLegacyInfoToDataType(resultType.getTypeAt(idx)))
+				)
 				.collect(Collectors.toList());
 		}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
index e5e43ef..af51499 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionResolver;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
 import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -77,10 +76,5 @@ public interface ResolverRule {
 		 * Lookup for over windows.
 		 */
 		Optional<LogicalOverWindow> getOverWindow(Expression alias);
-
-		/**
-		 * Temporary way to convert expression to PlannerExpression to evaluate type.
-		 */
-		PlannerExpression bridge(Expression expression);
 	}
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
index 84ea332..d68899e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
@@ -28,9 +28,9 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 public final class ResolverRules {
 
 	/**
-	 * Rule that resolves flatten call. See {@link FlattenCallRule} for details.
+	 * Rule that resolves flatten call. See {@link ResolveFlattenCallRule} for details.
 	 */
-	public static final ResolverRule FLATTEN_CALL = new FlattenCallRule();
+	public static final ResolverRule FLATTEN_CALL = new ResolveFlattenCallRule();
 
 	/**
 	 * Resolves {@link UnresolvedReferenceExpression}. See {@link ReferenceResolverRule} for details.
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
index b9e9f21..c4216e1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala
@@ -161,4 +161,16 @@ class CalcValidationTest extends TableTestBase {
     util.addTable[(Int, Long, String)]("MyTable", 'int, 'long, 'string)
       .select('int, 'long.log as 'long, 'string)
   }
+
+  @Test
+  def testConsecutiveFlattening(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("Consecutive flattening calls are not allowed.")
+
+    val util = streamTestUtil()
+    util.addTable[(Long, Int)](
+      "MyTable",
+      'tuple)
+    .select('tuple.flatten().flatten())
+  }
 }


[flink] 04/05: [FLINK-13028][table] Refactor expression package structure

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 14fe641fc97888ccc7035051a697334ec39a0e62
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 10:49:50 2019 +0200

    [FLINK-13028][table] Refactor expression package structure
---
 .../table/api/OverWindowPartitionedOrdered.java     |  2 +-
 .../table/api/internal/TableEnvironmentImpl.java    |  2 +-
 .../apache/flink/table/api/internal/TableImpl.java  |  2 +-
 .../PlannerExpressionParser.java                    |  8 +++++---
 .../flink/table/expressions/ExpressionParser.java   |  1 +
 .../expressions/resolver}/ExpressionResolver.java   | 21 ++++++++++++++-------
 .../expressions/{ => resolver}/LocalOverWindow.java |  5 ++---
 .../{ => resolver}/LookupCallResolver.java          |  6 +++++-
 .../resolver}/lookups/FieldReferenceLookup.java     |  2 +-
 .../lookups/TableReferenceLookup.java               |  2 +-
 .../resolver}/rules/ExpandColumnFunctionsRule.java  |  8 ++++----
 .../resolver}/rules/LookupCallByNameRule.java       |  4 ++--
 .../resolver}/rules/OverWindowResolverRule.java     |  8 ++++----
 .../rules/QualifyBuiltInFunctionsRule.java          |  2 +-
 .../resolver}/rules/ReferenceResolverRule.java      |  7 ++++---
 .../resolver}/rules/ResolveCallByArgumentsRule.java |  2 +-
 .../resolver}/rules/ResolveFlattenCallRule.java     |  4 ++--
 .../expressions/resolver}/rules/ResolverRule.java   | 10 +++++-----
 .../expressions/resolver}/rules/ResolverRules.java  |  2 +-
 .../resolver}/rules/RuleExpressionVisitor.java      |  6 +++---
 .../rules/StarReferenceFlatteningRule.java          |  2 +-
 .../{ => utils}/ApiExpressionDefaultVisitor.java    | 13 ++++++++++++-
 .../expressions/{ => utils}/ApiExpressionUtils.java | 11 ++++++++++-
 .../ResolvedExpressionDefaultVisitor.java           | 10 +++++++++-
 .../table/operations/OperationExpressionsUtils.java | 10 +++++-----
 .../flink/table/typeutils/FieldInfoUtils.java       |  2 +-
 .../flink/table/operations/QueryOperationTest.java  |  2 +-
 .../flink/table/expressions/ExpressionBuilder.java  |  1 +
 .../functions/aggfunctions/AvgAggFunction.java      |  2 +-
 .../functions/aggfunctions/ConcatAggFunction.java   |  2 +-
 .../functions/aggfunctions/Count1AggFunction.java   |  2 +-
 .../functions/aggfunctions/CountAggFunction.java    |  2 +-
 .../aggfunctions/DeclarativeAggregateFunction.java  |  2 +-
 .../functions/aggfunctions/IncrSumAggFunction.java  |  2 +-
 .../aggfunctions/IncrSumWithRetractAggFunction.java |  2 +-
 .../functions/aggfunctions/LeadLagAggFunction.java  |  2 +-
 .../functions/aggfunctions/MaxAggFunction.java      |  2 +-
 .../functions/aggfunctions/MinAggFunction.java      |  2 +-
 .../functions/aggfunctions/RankAggFunction.java     |  2 +-
 .../aggfunctions/RankLikeAggFunctionBase.java       |  2 +-
 .../aggfunctions/RowNumberAggFunction.java          |  2 +-
 .../aggfunctions/SingleValueAggFunction.java        |  2 +-
 .../functions/aggfunctions/Sum0AggFunction.java     |  2 +-
 .../functions/aggfunctions/SumAggFunction.java      |  2 +-
 .../aggfunctions/SumWithRetractAggFunction.java     |  2 +-
 .../table/codegen/agg/DeclarativeAggCodeGen.scala   |  4 ++--
 .../table/codegen/agg/batch/AggCodeGenHelper.scala  |  3 ++-
 .../codegen/agg/batch/HashAggCodeGenHelper.scala    |  3 ++-
 .../logical/LogicalWindowAggregateRuleBase.scala    |  2 +-
 .../flink/table/plan/util/RexNodeExtractor.scala    |  2 +-
 .../flink/table/sources/TableSourceUtil.scala       |  2 +-
 .../table/sources/tsextractors/ExistingField.scala  |  2 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala   |  2 +-
 .../table/plan/util/RexNodeExtractorTest.scala      |  2 +-
 .../apache/flink/table/util/testTableSources.scala  |  2 +-
 .../table/operations/AggregateOperationFactory.java |  6 +++---
 .../flink/table/operations/AliasOperationUtils.java |  8 ++++----
 .../table/operations/CalculatedTableFactory.java    |  4 ++--
 .../table/operations/ColumnOperationUtils.java      |  6 +++---
 .../table/operations/JoinOperationFactory.java      |  2 +-
 .../operations/OperationTreeBuilderFactory.java     |  2 +-
 .../operations/ProjectionOperationFactory.java      |  4 ++--
 .../table/operations/SortOperationFactory.java      |  4 ++--
 .../flink/table/plan/QueryOperationConverter.java   |  4 ++--
 .../table/api/internal/BatchTableEnvImpl.scala      |  3 ++-
 .../flink/table/api/internal/TableEnvImpl.scala     |  2 +-
 .../flink/table/api/scala/expressionDsl.scala       |  2 +-
 .../flink/table/expressions/ExpressionBridge.scala  |  1 +
 .../expressions/PlannerExpressionParserImpl.scala   |  3 ++-
 .../table/operations/OperationTreeBuilderImpl.scala |  8 +++++---
 .../flink/table/plan/util/RexProgramExtractor.scala |  2 +-
 .../flink/table/expressions/KeywordParseTest.scala  |  2 +-
 72 files changed, 163 insertions(+), 113 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
index d14dfff..34bf535 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 
 /**
  * Partially defined over window with (optional) partitioning and order.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 97f0631..40849aa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -46,7 +46,7 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 87223b6..79f8502 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -36,7 +36,7 @@ import org.apache.flink.table.api.WindowGroupedTable;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
-import org.apache.flink.table.expressions.LookupCallResolver;
+import org.apache.flink.table.expressions.resolver.LookupCallResolver;
 import org.apache.flink.table.functions.TemporalTableFunction;
 import org.apache.flink.table.functions.TemporalTableFunctionImpl;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
similarity index 87%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
index c347025..9985095 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerExpressionParser.java
@@ -16,17 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
 
 import java.lang.reflect.Constructor;
 import java.util.List;
 
 /**
- * Parser for expressions inside a String. This parses exactly the same expressions that
- * would be accepted by the Scala Expression DSL.
+ * Temporary utility for parsing expressions inside a String. This parses exactly the same expressions
+ * that would be accepted by the Scala Expression DSL.
  *
  * <p>{@link PlannerExpressionParser} is used by {@link ExpressionParser} to parse expressions.
  */
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
index d576cf5..c097f68 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.delegation.PlannerExpressionParser;
 
 import java.util.List;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
similarity index 92%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index ea45f33..4c1e62d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/ExpressionResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -16,17 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.resolver;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.FunctionLookup;
-import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
-import org.apache.flink.table.expressions.rules.ResolverRule;
-import org.apache.flink.table.expressions.rules.ResolverRules;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.resolver.rules.ResolverRule;
+import org.apache.flink.table.expressions.resolver.rules.ResolverRules;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.QueryOperation;
@@ -42,8 +49,8 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
 
 /**
  * Tries to resolve all unresolved expressions such as {@link UnresolvedReferenceExpression}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LocalOverWindow.java
similarity index 94%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LocalOverWindow.java
index 45e1a7a..d7ed813 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LocalOverWindow.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LocalOverWindow.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.resolver;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.Expression;
 
 import javax.annotation.Nullable;
 
@@ -28,7 +28,6 @@ import java.util.Optional;
 /**
  * Local over window created during expression resolution.
  */
-@Internal
 public final class LocalOverWindow {
 
 	private Expression alias;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
similarity index 87%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallResolver.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 4a978ae..1ad34e3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallResolver.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -16,11 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.resolver;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.LookupCallExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 
 import java.util.List;
 import java.util.stream.Collectors;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/FieldReferenceLookup.java
similarity index 98%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/FieldReferenceLookup.java
index 9ee13e6..d5aaa77 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/lookups/FieldReferenceLookup.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/FieldReferenceLookup.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.lookups;
+package org.apache.flink.table.expressions.resolver.lookups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableSchema;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/lookups/TableReferenceLookup.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/TableReferenceLookup.java
similarity index 95%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/lookups/TableReferenceLookup.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/TableReferenceLookup.java
index 0d6112c..bc3f9f1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/lookups/TableReferenceLookup.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/lookups/TableReferenceLookup.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.lookups;
+package org.apache.flink.table.expressions.resolver.lookups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.TableReferenceExpression;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ExpandColumnFunctionsRule.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ExpandColumnFunctionsRule.java
index b3fd2e7..bc206fe 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ExpandColumnFunctionsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ExpandColumnFunctionsRule.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.util.Preconditions;
 
@@ -35,8 +35,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RANGE_TO;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.WITHOUT_COLUMNS;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/LookupCallByNameRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/LookupCallByNameRule.java
similarity index 92%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/LookupCallByNameRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/LookupCallByNameRule.java
index 0d7142a..163758f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/LookupCallByNameRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/LookupCallByNameRule.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.LookupCallResolver;
+import org.apache.flink.table.expressions.resolver.LookupCallResolver;
 import org.apache.flink.table.functions.FunctionDefinition;
 
 import java.util.List;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
similarity index 94%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
index 3420437..3340589 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/OverWindowResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.resolver.LocalOverWindow;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,7 +34,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/QualifyBuiltInFunctionsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/QualifyBuiltInFunctionsRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
index ffe5193..70b5e78 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/QualifyBuiltInFunctionsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/QualifyBuiltInFunctionsRule.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.FunctionLookup;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ReferenceResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
similarity index 92%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ReferenceResolverRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
index 89b351b..3c5b142 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ReferenceResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ReferenceResolverRule.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 
@@ -29,13 +30,13 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 
 /**
  * Resolves {@link UnresolvedReferenceExpression} to either
  * {@link org.apache.flink.table.expressions.FieldReferenceExpression},
  * {@link org.apache.flink.table.expressions.TableReferenceExpression}, or
- * {@link org.apache.flink.table.expressions.LocalReferenceExpression} in this order.
+ * {@link LocalReferenceExpression} in this order.
  */
 @Internal
 final class ReferenceResolverRule implements ResolverRule {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
similarity index 99%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 23aa7d4..73429f4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
index 1d2a394..53d7750 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveFlattenCallRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveFlattenCallRule.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
 import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
similarity index 87%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
index 802b550..0a7fed0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRule.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionResolver;
-import org.apache.flink.table.expressions.LocalOverWindow;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
-import org.apache.flink.table.expressions.lookups.FieldReferenceLookup;
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.resolver.LocalOverWindow;
+import org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.FunctionDefinition;
 
 import java.util.List;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index d68899e..915f6a5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolverRules.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/RuleExpressionVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/RuleExpressionVisitor.java
similarity index 84%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/RuleExpressionVisitor.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/RuleExpressionVisitor.java
index 59a2338..debd0ef 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/RuleExpressionVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/RuleExpressionVisitor.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
-import org.apache.flink.table.expressions.rules.ResolverRule.ResolutionContext;
+import org.apache.flink.table.expressions.resolver.rules.ResolverRule.ResolutionContext;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 
 /**
  * Utility class for {@link ResolverRule} specific visitor that unifies access to
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/StarReferenceFlatteningRule.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/StarReferenceFlatteningRule.java
index 5283aed..5e58898 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/StarReferenceFlatteningRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/StarReferenceFlatteningRule.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions.rules;
+package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.Expression;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
similarity index 80%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionDefaultVisitor.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
index b31550d..3eb8467 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionDefaultVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionDefaultVisitor.java
@@ -16,9 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.ApiExpressionVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.LookupCallExpression;
+import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 
 /**
  * A utility {@link ApiExpressionVisitor} that calls {@link #defaultMethod(Expression)} by default,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
similarity index 87%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
index e651419..f04878f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
@@ -16,12 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.LookupCallExpression;
+import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.types.DataType;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ResolvedExpressionDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java
similarity index 76%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ResolvedExpressionDefaultVisitor.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java
index 2aa25d8..785c6e6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ResolvedExpressionDefaultVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ResolvedExpressionDefaultVisitor.java
@@ -16,9 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.expressions;
+package org.apache.flink.table.expressions.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ResolvedExpressionVisitor;
+import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 
 /**
  * A utility {@link ResolvedExpressionVisitor} that calls {@link #defaultMethod(ResolvedExpression)}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
index 7072a3f..8e9912b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
@@ -28,6 +27,7 @@ import org.apache.flink.table.expressions.LookupCallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 
@@ -37,11 +37,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
 import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.WINDOW_PROPERTIES;
 import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 321fb50..4877bc4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -28,11 +28,11 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.DataType;
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/QueryOperationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/QueryOperationTest.java
index b9374c6..7c28839 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/QueryOperationTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/QueryOperationTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis;
 import static org.junit.Assert.assertEquals;
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
index 6fc7f6f..333d763 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/ExpressionBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.expressions;
 
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
index ef656b7..8150843 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.types.logical.DecimalType;
 
 import java.math.BigDecimal;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.div;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
@@ -36,6 +35,7 @@ import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in avg aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/ConcatAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/ConcatAggFunction.java
index ed32b7d..19cc878 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/ConcatAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/ConcatAggFunction.java
@@ -24,12 +24,12 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.concat;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in concat aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Count1AggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Count1AggFunction.java
index 975e1b6..ca94f5e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Count1AggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Count1AggFunction.java
@@ -23,10 +23,10 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * This count1 aggregate function returns the count1 of values
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/CountAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/CountAggFunction.java
index cb115c4..a5a5639 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/CountAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/CountAggFunction.java
@@ -23,12 +23,12 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in count aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
index e688340..04c10c1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/DeclarativeAggregateFunction.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * API for aggregation functions that are expressed in terms of expressions.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumAggFunction.java
index 29e3536..9735432 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumAggFunction.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.lessThan;
@@ -34,6 +33,7 @@ import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.or;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in IncrSum aggregate function,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumWithRetractAggFunction.java
index 8a85b07..a6513be 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/IncrSumWithRetractAggFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
@@ -35,6 +34,7 @@ import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.or;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in IncrSum with retract aggregate function,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LeadLagAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LeadLagAggFunction.java
index 890677c..7b7c107 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LeadLagAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/LeadLagAggFunction.java
@@ -26,10 +26,10 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimeType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.cast;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.typeLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * LEAD and LAG aggregate functions return the value of given expression evaluated at given offset.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.java
index 1e68247..91ffa59 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimeType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.greaterThan;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in max aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinAggFunction.java
index 6a85b0b..7e5f27b 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/MinAggFunction.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimeType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.lessThan;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in min aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankAggFunction.java
index 2d75768..9dfaf96 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankAggFunction.java
@@ -27,13 +27,13 @@ import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
 
 import java.util.Arrays;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.and;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.not;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in rank aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
index 20541a3..39340ed 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java
@@ -32,11 +32,11 @@ import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Optional;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in rank like aggregate function, e.g. rank, dense_rank
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RowNumberAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RowNumberAggFunction.java
index 72a887b..e539d93 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RowNumberAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RowNumberAggFunction.java
@@ -24,9 +24,9 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in row_number aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SingleValueAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SingleValueAggFunction.java
index 1b86628..1bf82b0 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SingleValueAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SingleValueAggFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimeType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.greaterThan;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
@@ -35,6 +34,7 @@ import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.or;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.throwException;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * Base class for built-in single value aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.java
index 96dc41a..2bfd909 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.java
@@ -27,12 +27,12 @@ import org.apache.flink.table.types.logical.DecimalType;
 
 import java.math.BigDecimal;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in sum0 aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumAggFunction.java
index 5fee090..1f1455a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumAggFunction.java
@@ -26,11 +26,11 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in sum aggregate function.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.java
index ecf6cd4..775c92f 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.expressions.ExpressionBuilder.equalTo;
 import static org.apache.flink.table.expressions.ExpressionBuilder.ifThenElse;
 import static org.apache.flink.table.expressions.ExpressionBuilder.isNull;
@@ -33,6 +32,7 @@ import static org.apache.flink.table.expressions.ExpressionBuilder.literal;
 import static org.apache.flink.table.expressions.ExpressionBuilder.minus;
 import static org.apache.flink.table.expressions.ExpressionBuilder.nullOf;
 import static org.apache.flink.table.expressions.ExpressionBuilder.plus;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 
 /**
  * built-in sum aggregate function with retraction.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/DeclarativeAggCodeGen.scala
index 49f8ba1..a3bf6e0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/DeclarativeAggCodeGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/DeclarativeAggCodeGen.scala
@@ -20,13 +20,13 @@ package org.apache.flink.table.codegen.agg
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveTypeTermForType
 import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
 import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.expressions.{ApiExpressionUtils, ResolvedDistinctKeyReference, _}
+import org.apache.flink.table.expressions.{ResolvedDistinctKeyReference, _}
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.plan.util.AggregateInfo
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.logical.LogicalType
-
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils
 
 import scala.collection.JavaConverters._
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
index 58c5654..8e7d31de 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala
@@ -28,7 +28,8 @@ import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.OperatorCodeGenerator.STREAM_RECORD
 import org.apache.flink.table.codegen._
 import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
-import org.apache.flink.table.expressions.{UnresolvedCallExpression, Expression, ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter, TypeLiteralExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils
+import org.apache.flink.table.expressions.{Expression, ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, ResolvedAggLocalReference, RexNodeConverter, TypeLiteralExpression, UnresolvedCallExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes}
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
index ab343db..e2e9f83 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -27,7 +27,8 @@ import org.apache.flink.table.codegen._
 import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
 import org.apache.flink.table.codegen.sort.SortCodeGenerator
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, JoinedRow}
-import org.apache.flink.table.expressions.{UnresolvedCallExpression, Expression, ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, RexNodeConverter, TypeLiteralExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils
+import org.apache.flink.table.expressions.{Expression, ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, RexNodeConverter, TypeLiteralExpression, UnresolvedCallExpression, UnresolvedReferenceExpression, ValueLiteralExpression, _}
 import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction}
 import org.apache.flink.table.generated.{NormalizedKeyComputer, RecordComparator}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
index 2383da6..557f3c2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.rules.logical
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
 import org.apache.flink.table.expressions.{FieldReferenceExpression, WindowReference}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.plan.logical.{LogicalWindow, SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
index ec0a3d6..2f51b5b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RexNodeExtractor.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.util
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.catalog.{FunctionCatalog, FunctionLookup}
-import org.apache.flink.table.expressions.ApiExpressionUtils._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{AND, CAST, OR}
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 857ab92..46fda8f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, typeLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, typeLiteral}
 import org.apache.flink.table.expressions.{PlannerResolvedFieldReference, ResolvedFieldReference, RexNodeConverter}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index b20936d..540f79f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.descriptors.Rowtime
-import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, typeLiteral, valueLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, typeLiteral, valueLiteral}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 0661f03..94144b9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction
 import org.apache.flink.table.functions.aggfunctions.{DenseRankAggFunction, RankAggFunction, RowNumberAggFunction}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index 8751f80..e4641af 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.expressions.utils.Func1
 import org.apache.flink.table.functions.AggregateFunctionDefinition
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index 251c977..4c407f8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall
 import org.apache.flink.table.expressions.{Expression, FieldReferenceExpression, UnresolvedCallExpression, ValueLiteralExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
index 2e45b28..3b62170 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
@@ -35,14 +35,14 @@ import org.apache.flink.table.expressions.AggFunctionCall;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionBridge;
-import org.apache.flink.table.expressions.ExpressionResolver;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionRequirement;
@@ -64,7 +64,7 @@ import static java.lang.String.format;
 import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toList;
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.TABLE_AGGREGATE;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
index caac292..c73ca47 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
@@ -21,20 +21,20 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
 
 /**
  * Utility class for creating valid alias expressions that can be later used as a projection.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
index 8fb0625..2e10ee8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.typeutils.FieldInfoUtils;
@@ -36,7 +36,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.FunctionKind.TABLE;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java
index eaa3024..b84d33a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java
@@ -20,11 +20,11 @@ package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
-import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -33,7 +33,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
index 7d0400e..45748db 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
index c93c2fa..5b5705a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.ExpressionBridge;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.PlannerExpressionConverter$;
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 
 /**
  * Temporary solution for looking up the {@link OperationTreeBuilder}. The tree builder
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
index fe6912e..f5f1687 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
@@ -26,14 +26,14 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionBridge;
-import org.apache.flink.table.expressions.ExpressionResolver;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.logical.LogicalType;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
index 824463e..e2d86e8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.operations;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.ExpressionResolver;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ResolvedExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 
 import java.util.List;
 import java.util.stream.Collectors;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index a6b5d30..798e83e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -95,9 +95,9 @@ import scala.Some;
 
 import static java.util.Arrays.asList;
 import static java.util.stream.Collectors.toList;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
-import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunctionOfKind;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.TABLE_AGGREGATE;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index c1acd0a..b8425d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -30,7 +30,8 @@ import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory}
 import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{ApiExpressionDefaultVisitor, Expression, UnresolvedCallExpression}
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
+import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
 import org.apache.flink.table.operations.DataSetQueryOperation
 import org.apache.flink.table.plan.BatchOptimizer
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index c52c0f0..f82a68d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
 import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index c25e440..c84d5de 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -24,7 +24,7 @@ import java.time.{LocalDate, LocalDateTime}
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api.{DataTypes, Over, Table, ValidationException}
-import org.apache.flink.table.expressions.ApiExpressionUtils._
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{RANGE_TO, WITH_COLUMNS, E => FDE, UUID => FDUUID, _}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper, _}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
index 4bf7b32..7000bad 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionBridge.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions
 
 import org.apache.flink.table.catalog.FunctionLookup
+import org.apache.flink.table.expressions.resolver.LookupCallResolver
 
 /**
   * Bridges between API [[Expression]]s (for both Java and Scala) and final expression stack.
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
index 94e76b7..32161a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
@@ -22,7 +22,8 @@ import _root_.java.util.{List => JList}
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.ApiExpressionUtils._
+import org.apache.flink.table.delegation.PlannerExpressionParser
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
index 0f82604..3fba57c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
@@ -22,10 +22,12 @@ import java.util.{Collections, Optional, List => JList}
 
 import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.FunctionLookup
-import org.apache.flink.table.expressions.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.ExpressionResolver.resolverFor
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
+import org.apache.flink.table.expressions.resolver.ExpressionResolver.resolverFor
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.lookups.TableReferenceLookup
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
+import org.apache.flink.table.expressions.resolver.{ExpressionResolver, LookupCallResolver}
+import org.apache.flink.table.expressions.utils.{ApiExpressionDefaultVisitor, ApiExpressionUtils}
 import org.apache.flink.table.functions.FunctionKind.{SCALAR, TABLE}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.{AggregateFunctionDefinition, BuiltInFunctionDefinitions, TableFunctionDefinition}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index 48f0a6f..998a731 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.util.Preconditions
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
index 7d75964..da7ecce 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/KeywordParseTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.expressions
 
-import org.apache.flink.table.expressions.ApiExpressionUtils.{unresolvedCall, lookupCall, unresolvedRef}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, lookupCall, unresolvedRef}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.junit.Assert.assertEquals
 import org.junit.Test