You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/02 15:46:40 UTC

[flink] 02/03: [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d167a3670614901e4ef011af92b4045c7eb1612
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Sat Jun 1 19:08:56 2019 +0200

    [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module
---
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 +-
 .../flink/table/api/EnvironmentSettings.java       |   7 +
 .../table/api/internal/TableEnvironmentImpl.java   |  28 +-
 .../expressions/utils/ApiExpressionUtils.java      |  29 +-
 .../operations/OperationExpressionsUtils.java      |   4 +-
 .../table/operations/OperationTreeBuilder.java     | 680 +++++++++++++++++++--
 .../internal/StreamTableEnvironmentImpl.scala      |  10 +-
 .../flink/table/expressions/ExpressionUtils.java   |   8 +-
 .../operations/OperationTreeBuilderFactory.java    |  44 --
 .../flink/table/api/internal/TableEnvImpl.scala    |  17 +-
 .../operations/OperationTreeBuilderImpl.scala      | 600 ------------------
 .../api/stream/StreamTableEnvironmentTest.scala    |   3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   6 +-
 14 files changed, 702 insertions(+), 745 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 6b37690..05815dd 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -85,8 +85,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			TableConfig tableConfig,
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
-			Executor executor) {
-		super(catalogManager, tableConfig, executor, functionCatalog, planner);
+			Executor executor,
+			boolean isStreaming) {
+		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreaming);
 		this.executionEnvironment = executionEnvironment;
 	}
 
@@ -119,7 +120,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			tableConfig,
 			executionEnvironment,
 			planner,
-			executor
+			executor,
+			!settings.isBatchMode()
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 37ba179..70b7ffd 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -114,6 +114,13 @@ public class EnvironmentSettings {
 		return builtInDatabaseName;
 	}
 
+	/**
+	 * Tells if the {@link TableEnvironment} should work in a batch or streaming mode.
+	 */
+	public boolean isBatchMode() {
+		return isBatchMode;
+	}
+
 	@Internal
 	public Map<String, String> toPlannerProperties() {
 		Map<String, String> properties = new HashMap<>(toCommonProperties());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 727727a..9b04f56 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -46,7 +45,6 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -60,7 +58,6 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.util.StringUtils;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -92,7 +89,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			TableConfig tableConfig,
 			Executor executor,
 			FunctionCatalog functionCatalog,
-			Planner planner) {
+			Planner planner,
+			boolean isStreaming) {
 		this.catalogManager = catalogManager;
 		this.execEnv = executor;
 
@@ -103,32 +101,16 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
-		this.operationTreeBuilder = lookupTreeBuilder(
+		this.operationTreeBuilder = OperationTreeBuilder.create(
+			functionCatalog,
 			path -> {
 				Optional<CatalogQueryOperation> catalogTableOperation = scanInternal(path);
 				return catalogTableOperation.map(tableOperation -> new TableReferenceExpression(path, tableOperation));
 			},
-			functionCatalog
+			isStreaming
 		);
 	}
 
-	private static OperationTreeBuilder lookupTreeBuilder(
-		TableReferenceLookup tableReferenceLookup,
-		FunctionLookup functionDefinitionCatalog) {
-		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.operations.OperationTreeBuilderFactory");
-			Method createMethod = clazz.getMethod(
-				"create",
-				TableReferenceLookup.class,
-				FunctionLookup.class);
-
-			return (OperationTreeBuilder) createMethod.invoke(null, tableReferenceLookup, functionDefinitionCatalog);
-		} catch (Exception e) {
-			throw new TableException(
-				"Could not instantiate the operation builder. Make sure the planner module is on the classpath");
-		}
-	}
-
 	@VisibleForTesting
 	public Planner getPlanner() {
 		return planner;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
index f04878f..5d0c2c3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.types.DataType;
@@ -116,18 +117,34 @@ public final class ApiExpressionUtils {
 	/**
 	 * Checks if the expression is a function call of given type.
 	 *
-	 * @param expr expression to check
+	 * @param expression expression to check
 	 * @param kind expected type of function
 	 * @return true if the expression is function call of given type, false otherwise
 	 */
-	public static boolean isFunctionOfKind(Expression expr, FunctionKind kind) {
-		if (expr instanceof UnresolvedCallExpression) {
-			return ((UnresolvedCallExpression) expr).getFunctionDefinition().getKind() == kind;
+	public static boolean isFunctionOfKind(Expression expression, FunctionKind kind) {
+		if (expression instanceof UnresolvedCallExpression) {
+			return ((UnresolvedCallExpression) expression).getFunctionDefinition().getKind() == kind;
 		}
-		if (expr instanceof CallExpression) {
-			return ((CallExpression) expr).getFunctionDefinition().getKind() == kind;
+		if (expression instanceof CallExpression) {
+			return ((CallExpression) expression).getFunctionDefinition().getKind() == kind;
 		}
 		return false;
+	}
 
+	/**
+	 * Checks if the given expression is a given builtin function.
+	 *
+	 * @param expression expression to check
+	 * @param functionDefinition expected function definition
+	 * @return true if the given expression is a given function call
+	 */
+	public static boolean isFunction(Expression expression, BuiltInFunctionDefinition functionDefinition) {
+		if (expression instanceof UnresolvedCallExpression) {
+			return ((UnresolvedCallExpression) expression).getFunctionDefinition() == functionDefinition;
+		}
+		if (expression instanceof CallExpression) {
+			return ((CallExpression) expression).getFunctionDefinition() == functionDefinition;
+		}
+		return false;
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
index 8e9912b..eb2030d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
@@ -155,8 +155,8 @@ public class OperationExpressionsUtils {
 		private final Map<Expression, String> properties;
 
 		private AggregationAndPropertiesReplacer(
-			Map<Expression, String> aggregates,
-			Map<Expression, String> properties) {
+				Map<Expression, String> aggregates,
+				Map<Expression, String> properties) {
 			this.aggregates = aggregates;
 			this.properties = properties;
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
index 50eb8d9..37de6da 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
@@ -19,92 +19,676 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.resolver.LookupCallResolver;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
+import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
+import org.apache.flink.table.operations.utils.factories.AggregateOperationFactory;
+import org.apache.flink.table.operations.utils.factories.AliasOperationUtils;
+import org.apache.flink.table.operations.utils.factories.CalculatedTableFactory;
+import org.apache.flink.table.operations.utils.factories.ColumnOperationUtils;
+import org.apache.flink.table.operations.utils.factories.JoinOperationFactory;
+import org.apache.flink.table.operations.utils.factories.ProjectionOperationFactory;
+import org.apache.flink.table.operations.utils.factories.SetOperationFactory;
+import org.apache.flink.table.operations.utils.factories.SortOperationFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
+import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.INTERSECT;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.MINUS;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.UNION;
 
 /**
- * Builder for validated {@link QueryOperation}s.
- *
- * <p>TODO. This is a temporary solution. The actual implementation should be ported.
+ * A builder for constructing validated {@link QueryOperation}s.
  */
 @Internal
-public interface OperationTreeBuilder {
-	QueryOperation project(List<Expression> projectList, QueryOperation child);
+public final class OperationTreeBuilder {
+
+	private final FunctionLookup functionCatalog;
+	private final TableReferenceLookup tableReferenceLookup;
+	private final LookupCallResolver lookupResolver;
+
+	/**
+	 * Utility classes for constructing a validated operation of certain type.
+	 */
+	private final ProjectionOperationFactory projectionOperationFactory;
+	private final SortOperationFactory sortOperationFactory;
+	private final CalculatedTableFactory calculatedTableFactory;
+	private final SetOperationFactory setOperationFactory;
+	private final AggregateOperationFactory aggregateOperationFactory;
+	private final JoinOperationFactory joinOperationFactory;
+
+	private OperationTreeBuilder(
+			FunctionLookup functionLookup,
+			TableReferenceLookup tableReferenceLookup,
+			ProjectionOperationFactory projectionOperationFactory,
+			SortOperationFactory sortOperationFactory,
+			CalculatedTableFactory calculatedTableFactory,
+			SetOperationFactory setOperationFactory,
+			AggregateOperationFactory aggregateOperationFactory,
+			JoinOperationFactory joinOperationFactory) {
+		this.functionCatalog = functionLookup;
+		this.tableReferenceLookup = tableReferenceLookup;
+		this.projectionOperationFactory = projectionOperationFactory;
+		this.sortOperationFactory = sortOperationFactory;
+		this.calculatedTableFactory = calculatedTableFactory;
+		this.setOperationFactory = setOperationFactory;
+		this.aggregateOperationFactory = aggregateOperationFactory;
+		this.joinOperationFactory = joinOperationFactory;
+		this.lookupResolver = new LookupCallResolver(functionLookup);
+	}
+
+	public static OperationTreeBuilder create(
+			FunctionLookup functionCatalog,
+			TableReferenceLookup tableReferenceLookup,
+			boolean isStreaming) {
+		return new OperationTreeBuilder(
+			functionCatalog,
+			tableReferenceLookup,
+			new ProjectionOperationFactory(),
+			new SortOperationFactory(isStreaming),
+			new CalculatedTableFactory(),
+			new SetOperationFactory(isStreaming),
+			new AggregateOperationFactory(isStreaming),
+			new JoinOperationFactory()
+		);
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child) {
+		return project(projectList, child, false);
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child, boolean explicitAlias) {
+		projectList.forEach(p -> p.accept(new NoAggregateChecker(
+			"Aggregate functions are not supported in the select right after the aggregate" +
+				" or flatAggregate operation.")));
+		projectList.forEach(p -> p.accept(new NoWindowPropertyChecker(
+			"Window properties can only be used on windowed tables.")));
+		return projectInternal(projectList, child, explicitAlias, Collections.emptyList());
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows) {
+
+		Preconditions.checkArgument(!overWindows.isEmpty());
+
+		projectList.forEach(p -> p.accept(new NoWindowPropertyChecker(
+			"Window start and end properties are not available for Over windows.")));
+
+		return projectInternal(
+			projectList,
+			child,
+			true,
+			overWindows);
+	}
+
+	private QueryOperation projectInternal(
+			List<Expression> projectList,
+			QueryOperation child,
+			boolean explicitAlias,
+			List<OverWindow> overWindows) {
+
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child)
+			.withOverWindows(overWindows)
+			.build();
+		List<ResolvedExpression> projections = resolver.resolve(projectList);
+		return projectionOperationFactory.create(projections, child, explicitAlias, resolver.postResolverFactory());
+	}
+
+	/**
+	 * Adds additional columns. Existing fields will be replaced if replaceIfExist is true.
+	 */
+	public QueryOperation addColumns(boolean replaceIfExist, List<Expression> fieldLists, QueryOperation child) {
+		final List<Expression> newColumns;
+		if (replaceIfExist) {
+			String[] fieldNames = child.getTableSchema().getFieldNames();
+			newColumns = ColumnOperationUtils.addOrReplaceColumns(Arrays.asList(fieldNames), fieldLists);
+		} else {
+			newColumns = new ArrayList<>(fieldLists);
+			newColumns.add(0, new UnresolvedReferenceExpression("*"));
+		}
+		return project(newColumns, child, false);
+	}
+
+	public QueryOperation renameColumns(List<Expression> aliases, QueryOperation child) {
+
+		ExpressionResolver resolver = getResolver(child);
+		String[] inputFieldNames = child.getTableSchema().getFieldNames();
+		List<Expression> validateAliases = ColumnOperationUtils.renameColumns(
+			Arrays.asList(inputFieldNames),
+			resolver.resolveExpanding(aliases));
+
+		return project(validateAliases, child, false);
+	}
+
+	public QueryOperation dropColumns(List<Expression> fieldLists, QueryOperation child) {
+
+		ExpressionResolver resolver = getResolver(child);
+		String[] inputFieldNames = child.getTableSchema().getFieldNames();
+		List<Expression> finalFields = ColumnOperationUtils.dropFields(
+			Arrays.asList(inputFieldNames),
+			resolver.resolveExpanding(fieldLists));
+
+		return project(finalFields, child, false);
+	}
+
+	public QueryOperation aggregate(List<Expression> groupingExpressions, List<Expression> aggregates, QueryOperation child) {
 
-	QueryOperation project(List<Expression> projectList, QueryOperation child, boolean explicitAlias);
+		ExpressionResolver resolver = getResolver(child);
 
-	QueryOperation project(List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows);
+		List<ResolvedExpression> resolvedGroupings = resolver.resolve(groupingExpressions);
+		List<ResolvedExpression> resolvedAggregates = resolver.resolve(aggregates);
 
-	QueryOperation windowAggregate(
+		return aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child);
+	}
+
+	public QueryOperation windowAggregate(
 		List<Expression> groupingExpressions,
 		GroupWindow window,
 		List<Expression> windowProperties,
 		List<Expression> aggregates,
-		QueryOperation child);
+		QueryOperation child) {
 
-	QueryOperation join(
-		QueryOperation left,
-		QueryOperation right,
-		JoinType joinType,
-		Optional<Expression> condition,
-		boolean correlated);
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver);
 
-	QueryOperation joinLateral(
-		QueryOperation left,
-		Expression tableFunction,
-		JoinType joinType,
-		Optional<Expression> condition);
+		ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor(
+				tableReferenceLookup,
+				functionCatalog,
+				child)
+			.withLocalReferences(
+				new LocalReferenceExpression(
+					resolvedWindow.getAlias(),
+					resolvedWindow.getTimeAttribute().getOutputDataType()))
+			.build();
 
-	Expression resolveExpression(Expression expression, QueryOperation... tableOperation);
+		List<ResolvedExpression> convertedGroupings = resolverWithWindowReferences.resolve(groupingExpressions);
+		List<ResolvedExpression> convertedAggregates = resolverWithWindowReferences.resolve(aggregates);
+		List<ResolvedExpression> convertedProperties = resolverWithWindowReferences.resolve(windowProperties);
 
-	QueryOperation sort(List<Expression> fields, QueryOperation child);
+		return aggregateOperationFactory.createWindowAggregate(
+			convertedGroupings,
+			convertedAggregates,
+			convertedProperties,
+			resolvedWindow,
+			child);
+	}
 
-	QueryOperation limitWithOffset(int offset, QueryOperation child);
+	public QueryOperation join(
+			QueryOperation left,
+			QueryOperation right,
+			JoinType joinType,
+			Optional<Expression> condition,
+			boolean correlated) {
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, left, right)
+			.build();
+		Optional<ResolvedExpression> resolvedCondition = condition.map(expr -> resolveSingleExpression(expr, resolver));
 
-	QueryOperation limitWithFetch(int fetch, QueryOperation child);
+		return joinOperationFactory.create(
+			left,
+			right,
+			joinType,
+			resolvedCondition.orElse(valueLiteral(true)),
+			correlated);
+	}
 
-	QueryOperation alias(List<Expression> fields, QueryOperation child);
+	public QueryOperation joinLateral(
+			QueryOperation left,
+			Expression tableFunction,
+			JoinType joinType,
+			Optional<Expression> condition) {
+		ExpressionResolver resolver = getResolver(left);
+		ResolvedExpression resolvedFunction = resolveSingleExpression(tableFunction, resolver);
 
-	QueryOperation filter(Expression condition, QueryOperation child);
+		QueryOperation temporalTable = calculatedTableFactory.create(
+			resolvedFunction,
+			left.getTableSchema().getFieldNames());
 
-	QueryOperation distinct(QueryOperation child);
+		return join(left, temporalTable, joinType, condition, true);
+	}
 
-	QueryOperation minus(QueryOperation left, QueryOperation right, boolean all);
+	public Expression resolveExpression(Expression expression, QueryOperation... tableOperation) {
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			tableReferenceLookup,
+			functionCatalog,
+			tableOperation).build();
 
-	QueryOperation intersect(QueryOperation left, QueryOperation right, boolean all);
+		return resolveSingleExpression(expression, resolver);
+	}
 
-	QueryOperation union(QueryOperation left, QueryOperation right, boolean all);
+	private ResolvedExpression resolveSingleExpression(Expression expression, ExpressionResolver resolver) {
+		List<ResolvedExpression> resolvedExpression = resolver.resolve(Collections.singletonList(expression));
+		if (resolvedExpression.size() != 1) {
+			throw new ValidationException("Expected single expression");
+		} else {
+			return resolvedExpression.get(0);
+		}
+	}
 
-	/* Extensions */
+	public QueryOperation sort(List<Expression> fields, QueryOperation child) {
 
-	QueryOperation addColumns(boolean replaceIfExist, List<Expression> fieldLists, QueryOperation child);
+		ExpressionResolver resolver = getResolver(child);
+		List<ResolvedExpression> resolvedFields = resolver.resolve(fields);
 
-	QueryOperation renameColumns(List<Expression> aliases, QueryOperation child);
+		return sortOperationFactory.createSort(resolvedFields, child, resolver.postResolverFactory());
+	}
 
-	QueryOperation dropColumns(List<Expression> fieldLists, QueryOperation child);
+	public QueryOperation limitWithOffset(int offset, QueryOperation child) {
+		return sortOperationFactory.createLimitWithOffset(offset, child);
+	}
 
-	QueryOperation aggregate(List<Expression> groupingExpressions, List<Expression> aggregates, QueryOperation child);
+	public QueryOperation limitWithFetch(int fetch, QueryOperation child) {
+		return sortOperationFactory.createLimitWithFetch(fetch, child);
+	}
 
-	QueryOperation map(Expression mapFunction, QueryOperation child);
+	public QueryOperation alias(List<Expression> fields, QueryOperation child) {
+		List<Expression> newFields = AliasOperationUtils.createAliasList(fields, child);
 
-	QueryOperation flatMap(Expression tableFunction, QueryOperation child);
+		return project(newFields, child, true);
+	}
 
-	QueryOperation aggregate(List<Expression> groupingExpressions, Expression aggregate, QueryOperation child);
+	public QueryOperation filter(Expression condition, QueryOperation child) {
 
-	QueryOperation tableAggregate(
-		List<Expression> groupingExpressions,
-		Expression tableAggFunction,
-		QueryOperation child);
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedExpression resolvedExpression = resolveSingleExpression(condition, resolver);
+		DataType conditionType = resolvedExpression.getOutputDataType();
+		if (!LogicalTypeChecks.hasRoot(conditionType.getLogicalType(), LogicalTypeRoot.BOOLEAN)) {
+			throw new ValidationException("Filter operator requires a boolean expression as input," +
+				" but $condition is of type " + conditionType);
+		}
 
-	QueryOperation windowTableAggregate(
-		List<Expression> groupingExpressions,
-		GroupWindow window,
-		List<Expression> windowProperties,
-		Expression tableAggFunction,
-		QueryOperation child);
+		return new FilterQueryOperation(resolvedExpression, child);
+	}
+
+	public QueryOperation distinct(QueryOperation child) {
+		return new DistinctQueryOperation(child);
+	}
+
+	public QueryOperation minus(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(MINUS, left, right, all);
+	}
+
+	public QueryOperation intersect(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(INTERSECT, left, right, all);
+	}
+
+	public QueryOperation union(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(UNION, left, right, all);
+	}
+
+	public QueryOperation map(Expression mapFunction, QueryOperation child) {
+
+		Expression resolvedMapFunction = mapFunction.accept(lookupResolver);
+
+		if (!ApiExpressionUtils.isFunctionOfKind(resolvedMapFunction, FunctionKind.SCALAR)) {
+			throw new ValidationException("Only a scalar function can be used in the map operator.");
+		}
+
+		Expression expandedFields = unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, resolvedMapFunction);
+		return project(Collections.singletonList(expandedFields), child, false);
+	}
+
+	public QueryOperation flatMap(Expression tableFunction, QueryOperation child) {
+
+		Expression resolvedTableFunction = tableFunction.accept(lookupResolver);
+
+		if (!ApiExpressionUtils.isFunctionOfKind(resolvedTableFunction, FunctionKind.TABLE)) {
+			throw new ValidationException("Only a table function can be used in the flatMap operator.");
+		}
+
+		TypeInformation<?> resultType = ((TableFunctionDefinition) ((UnresolvedCallExpression) resolvedTableFunction)
+			.getFunctionDefinition())
+			.getResultType();
+		List<String> originFieldNames = Arrays.asList(FieldInfoUtils.getFieldNames(resultType));
+
+		List<String> childFields = Arrays.asList(child.getTableSchema().getFieldNames());
+		Set<String> usedFieldNames = new HashSet<>(childFields);
+
+		List<Expression> args = new ArrayList<>();
+		for (String originFieldName : originFieldNames) {
+			String resultName = getUniqueName(originFieldName, usedFieldNames);
+			usedFieldNames.add(resultName);
+			args.add(valueLiteral(resultName));
+		}
+
+		args.add(0, resolvedTableFunction);
+		Expression renamedTableFunction = unresolvedCall(
+			BuiltInFunctionDefinitions.AS,
+			args.toArray(new Expression[0]));
+		QueryOperation joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, Optional.empty());
+		QueryOperation rightNode = dropColumns(
+			childFields.stream().map(UnresolvedReferenceExpression::new).collect(Collectors.toList()),
+			joinNode);
+		return alias(
+			originFieldNames.stream().map(UnresolvedReferenceExpression::new).collect(Collectors.toList()),
+			rightNode);
+	}
+
+	public QueryOperation aggregate(List<Expression> groupingExpressions, Expression aggregate, QueryOperation child) {
+		Expression resolvedAggregate = aggregate.accept(lookupResolver);
+		AggregateWithAlias aggregateWithAlias = resolvedAggregate.accept(new ExtractAliasAndAggregate());
+
+		// turn agg to a named agg, because it will be verified later.
+		String[] childNames = child.getTableSchema().getFieldNames();
+		Expression aggregateRenamed = addAliasToTheCallInGroupings(
+			Arrays.asList(childNames),
+			Collections.singletonList(aggregateWithAlias.aggregate)).get(0);
+
+		// get agg table
+		QueryOperation aggregateOperation = this.aggregate(
+			groupingExpressions,
+			Collections.singletonList(aggregateRenamed),
+			child);
+
+		// flatten the aggregate function
+		String[] aggNames = aggregateOperation.getTableSchema().getFieldNames();
+		List<Expression> flattenedExpressions = Arrays.asList(aggNames)
+			.subList(0, groupingExpressions.size())
+			.stream()
+			.map(ApiExpressionUtils::unresolvedRef)
+			.collect(Collectors.toCollection(ArrayList::new));
+
+		flattenedExpressions.add(unresolvedCall(
+			BuiltInFunctionDefinitions.FLATTEN,
+			unresolvedRef(aggNames[aggNames.length - 1])));
+
+		QueryOperation flattenedProjection = this.project(flattenedExpressions, aggregateOperation);
+
+		// add alias
+		return aliasBackwardFields(flattenedProjection, aggregateWithAlias.aliases, groupingExpressions.size());
+	}
+
+	private static class AggregateWithAlias {
+		private final UnresolvedCallExpression aggregate;
+		private final List<String> aliases;
+
+		private AggregateWithAlias(UnresolvedCallExpression aggregate, List<String> aliases) {
+			this.aggregate = aggregate;
+			this.aliases = aliases;
+		}
+	}
+
+	private static class ExtractAliasAndAggregate extends ApiExpressionDefaultVisitor<AggregateWithAlias> {
+		@Override
+		public AggregateWithAlias visit(UnresolvedCallExpression unresolvedCall) {
+			if (ApiExpressionUtils.isFunction(unresolvedCall, BuiltInFunctionDefinitions.AS)) {
+				Expression expression = unresolvedCall.getChildren().get(0);
+				if (expression instanceof UnresolvedCallExpression) {
+					List<String> aliases = extractAliases(unresolvedCall);
+
+					return getAggregate((UnresolvedCallExpression) expression, aliases)
+						.orElseGet(() -> defaultMethod(unresolvedCall));
+				} else {
+					return defaultMethod(unresolvedCall);
+				}
+			}
+
+			return getAggregate(unresolvedCall, Collections.emptyList()).orElseGet(() -> defaultMethod(unresolvedCall));
+		}
+
+		private List<String> extractAliases(UnresolvedCallExpression unresolvedCall) {
+			return unresolvedCall.getChildren()
+								.subList(1, unresolvedCall.getChildren().size())
+								.stream()
+								.map(ex -> ExpressionUtils.extractValue(ex, String.class)
+									.orElseThrow(() -> new TableException("Expected string literal as alias.")))
+								.collect(Collectors.toList());
+		}
+
+		private Optional<AggregateWithAlias> getAggregate(
+				UnresolvedCallExpression unresolvedCall,
+				List<String> aliases) {
+			FunctionDefinition functionDefinition = unresolvedCall.getFunctionDefinition();
+			if (ApiExpressionUtils.isFunctionOfKind(unresolvedCall, FunctionKind.AGGREGATE)) {
+				final List<String> fieldNames;
+				if (aliases.isEmpty()) {
+					if (functionDefinition instanceof AggregateFunctionDefinition) {
+						TypeInformation<?> resultTypeInfo = ((AggregateFunctionDefinition) functionDefinition)
+							.getResultTypeInfo();
+						fieldNames = Arrays.asList(FieldInfoUtils.getFieldNames(resultTypeInfo));
+					} else {
+						fieldNames = Collections.emptyList();
+					}
+				} else {
+					fieldNames = aliases;
+				}
+				return Optional.of(new AggregateWithAlias(unresolvedCall, fieldNames));
+			} else {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		protected AggregateWithAlias defaultMethod(Expression expression) {
+			throw new ValidationException("Aggregate function expected. Got: " + expression);
+		}
+	}
+
+	public QueryOperation tableAggregate(
+			List<Expression> groupingExpressions,
+			Expression tableAggFunction,
+			QueryOperation child) {
+
+		// Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+		// groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
+		// table aggregate function in Step4.
+		List<Expression> newGroupingExpressions = addAliasToTheCallInGroupings(
+			Arrays.asList(child.getTableSchema().getFieldNames()),
+			groupingExpressions);
+
+		// Step2: resolve expressions
+		ExpressionResolver resolver = getResolver(child);
+		List<ResolvedExpression> resolvedGroupings = resolver.resolve(newGroupingExpressions);
+		Tuple2<ResolvedExpression, List<String>> resolvedFunctionAndAlias =
+			aggregateOperationFactory.extractTableAggFunctionAndAliases(
+				resolveSingleExpression(tableAggFunction, resolver));
+
+		// Step3: create table agg operation
+		QueryOperation tableAggOperation = aggregateOperationFactory
+			.createAggregate(resolvedGroupings, Collections.singletonList(resolvedFunctionAndAlias.f0), child);
+
+		// Step4: add a top project to alias the output fields of the table aggregate.
+		return aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size());
+	}
+
+	public QueryOperation windowTableAggregate(
+			List<Expression> groupingExpressions,
+			GroupWindow window,
+			List<Expression> windowProperties,
+			Expression tableAggFunction,
+			QueryOperation child) {
+
+		// Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+		// groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
+		// table aggregate function in Step4.
+		List<Expression> newGroupingExpressions = addAliasToTheCallInGroupings(
+			Arrays.asList(child.getTableSchema().getFieldNames()),
+			groupingExpressions);
+
+		// Step2: resolve expressions, including grouping, aggregates and window properties.
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver);
+
+		ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor(
+			tableReferenceLookup,
+			functionCatalog,
+			child)
+			.withLocalReferences(
+				new LocalReferenceExpression(
+					resolvedWindow.getAlias(),
+					resolvedWindow.getTimeAttribute().getOutputDataType()))
+			.build();
+
+		List<ResolvedExpression> convertedGroupings = resolverWithWindowReferences.resolve(newGroupingExpressions);
+		List<ResolvedExpression> convertedAggregates = resolverWithWindowReferences.resolve(Collections.singletonList(
+			tableAggFunction));
+		List<ResolvedExpression> convertedProperties = resolverWithWindowReferences.resolve(windowProperties);
+		Tuple2<ResolvedExpression, List<String>> resolvedFunctionAndAlias = aggregateOperationFactory
+			.extractTableAggFunctionAndAliases(convertedAggregates.get(0));
+
+		// Step3: create window table agg operation
+		QueryOperation tableAggOperation = aggregateOperationFactory.createWindowAggregate(
+			convertedGroupings,
+			Collections.singletonList(resolvedFunctionAndAlias.f0),
+			convertedProperties,
+			resolvedWindow,
+			child);
+
+		// Step4: add a top project to alias the output fields of the table aggregate. Also, project the
+		// window attribute.
+		return aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size());
+	}
+
+	/**
+	 * Rename fields in the input {@link QueryOperation}.
+	 */
+	private QueryOperation aliasBackwardFields(
+		QueryOperation inputOperation,
+		List<String> alias,
+		int aliasStartIndex) {
+
+		if (!alias.isEmpty()) {
+			String[] namesBeforeAlias = inputOperation.getTableSchema().getFieldNames();
+			List<String> namesAfterAlias = new ArrayList<>(Arrays.asList(namesBeforeAlias));
+			for (int i = 0; i < alias.size(); i++) {
+				int withOffset = aliasStartIndex + i;
+				namesAfterAlias.remove(withOffset);
+				namesAfterAlias.add(withOffset, alias.get(i));
+			}
+
+			return this.alias(namesAfterAlias.stream()
+				.map(UnresolvedReferenceExpression::new)
+				.collect(Collectors.toList()), inputOperation);
+		} else {
+			return inputOperation;
+		}
+	}
+
+	/**
+	 * Add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+	 * groupBy(a % 5 as TMP_0).
+	 */
+	private List<Expression> addAliasToTheCallInGroupings(
+		List<String> inputFieldNames,
+		List<Expression> groupingExpressions) {
+
+		int attrNameCntr = 0;
+		Set<String> usedFieldNames = new HashSet<>(inputFieldNames);
+
+		List<Expression> result = new ArrayList<>();
+		for (Expression groupingExpression : groupingExpressions) {
+			if (groupingExpression instanceof UnresolvedCallExpression &&
+				!ApiExpressionUtils.isFunction(groupingExpression, BuiltInFunctionDefinitions.AS)) {
+				String tempName = getUniqueName("TMP_" + attrNameCntr, usedFieldNames);
+				attrNameCntr += 1;
+				usedFieldNames.add(tempName);
+				result.add(unresolvedCall(
+					BuiltInFunctionDefinitions.AS,
+					groupingExpression,
+					valueLiteral(tempName)));
+			} else {
+				result.add(groupingExpression);
+			}
+		}
+
+		return result;
+	}
+
+	/**
+	 * Return a unique name that does not exist in usedFieldNames according to the input name.
+	 */
+	private String getUniqueName(String inputName, Collection<String> usedFieldNames) {
+		int i = 0;
+		String resultName = inputName;
+		while (usedFieldNames.contains(resultName)) {
+			resultName = resultName + "_" + i;
+			i += 1;
+		}
+		return resultName;
+	}
+
+	private ExpressionResolver getResolver(QueryOperation child) {
+		return ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child).build();
+	}
+
+	private static class NoWindowPropertyChecker extends ApiExpressionDefaultVisitor<Void> {
+		private final String exceptionMessage;
+
+		private NoWindowPropertyChecker(String exceptionMessage) {
+			this.exceptionMessage = exceptionMessage;
+		}
+
+		@Override
+		public Void visit(UnresolvedCallExpression call) {
+			FunctionDefinition functionDefinition = call.getFunctionDefinition();
+			if (BuiltInFunctionDefinitions.WINDOW_PROPERTIES.contains(functionDefinition)) {
+				throw new ValidationException(exceptionMessage);
+			}
+			call.getChildren().forEach(expr -> expr.accept(this));
+			return null;
+		}
+
+		@Override
+		protected Void defaultMethod(Expression expression) {
+			return null;
+		}
+	}
+
+	private static class NoAggregateChecker extends ApiExpressionDefaultVisitor<Void> {
+		private final String exceptionMessage;
+
+		private NoAggregateChecker(String exceptionMessage) {
+			this.exceptionMessage = exceptionMessage;
+		}
+
+		@Override
+		public Void visit(UnresolvedCallExpression call) {
+			if (ApiExpressionUtils.isFunctionOfKind(call, FunctionKind.AGGREGATE)) {
+				throw new ValidationException(exceptionMessage);
+			}
+			call.getChildren().forEach(expr -> expr.accept(this));
+			return null;
+		}
+
+		@Override
+		protected Void defaultMethod(Expression expression) {
+			return null;
+		}
+	}
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 96b7403..990f043 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -55,13 +55,15 @@ class StreamTableEnvironmentImpl (
     config: TableConfig,
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
-    executor: Executor)
+    executor: Executor,
+    isStreaming: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
     config,
     executor,
     functionCatalog,
-    planner)
+    planner,
+    isStreaming)
   with org.apache.flink.table.api.scala.StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -262,7 +264,9 @@ object StreamTableEnvironmentImpl {
       tableConfig,
       executionEnvironment,
       planner,
-      executor)
+      executor,
+      !settings.isBatchMode
+    )
   }
 
   private def lookupExecutor(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
index 90aece5..951be99 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
@@ -32,14 +32,14 @@ public final class ExpressionUtils {
 	 * Extracts the value (excluding null) of a given class from an expression assuming it is a
 	 * {@link ValueLiteralExpression}.
 	 *
-	 * @param expr literal to extract the value from
+	 * @param expression literal to extract the value from
 	 * @param targetClass expected class to extract from the literal
 	 * @param <V> type of extracted value
 	 * @return extracted value or empty if could not extract value of given type
 	 */
-	public static <V> Optional<V> extractValue(Expression expr, Class<V> targetClass) {
-		if (expr instanceof ValueLiteralExpression) {
-			final ValueLiteralExpression valueLiteral = (ValueLiteralExpression) expr;
+	public static <V> Optional<V> extractValue(Expression expression, Class<V> targetClass) {
+		if (expression instanceof ValueLiteralExpression) {
+			final ValueLiteralExpression valueLiteral = (ValueLiteralExpression) expression;
 			return valueLiteral.getValueAs(targetClass);
 		}
 		return Optional.empty();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
deleted file mode 100644
index 1a01307..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.FunctionLookup;
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
-
-/**
- * Temporary solution for looking up the {@link OperationTreeBuilder}. The tree builder
- * should be moved to api module once the type inference is in place.
- */
-@Internal
-public final class OperationTreeBuilderFactory {
-
-	public static OperationTreeBuilder create(
-			TableReferenceLookup tableReferenceLookup,
-			FunctionLookup functionLookup) {
-		return new OperationTreeBuilderImpl(
-			tableReferenceLookup,
-			functionLookup,
-			true
-		);
-	}
-
-	private OperationTreeBuilderFactory() {
-	}
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index efa29ed..0041141 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -18,12 +18,6 @@
 
 package org.apache.flink.table.api.internal
 
-import _root_.java.util.Optional
-
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.FrameworkConfig
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
@@ -40,6 +34,13 @@ import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.util.StringUtils
 
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.tools.FrameworkConfig
+
+import _root_.java.util.Optional
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -73,9 +74,9 @@ abstract class TableEnvImpl(
     }
   }
 
-  private[flink] val operationTreeBuilder = new OperationTreeBuilderImpl(
-    tableLookup,
+  private[flink] val operationTreeBuilder = OperationTreeBuilder.create(
     functionCatalog,
+    tableLookup,
     !isBatch)
 
   protected val planningConfigurationBuilder: PlanningConfigurationBuilder =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
deleted file mode 100644
index 03a9bc3..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations
-
-import java.util.{Collections, Optional, List => JList}
-import org.apache.flink.table.api._
-import org.apache.flink.table.catalog.FunctionLookup
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.resolver.ExpressionResolver.resolverFor
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
-import org.apache.flink.table.expressions.resolver.{ExpressionResolver, LookupCallResolver}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.utils.{ApiExpressionDefaultVisitor, ApiExpressionUtils}
-import org.apache.flink.table.functions.FunctionKind.{SCALAR, TABLE}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.functions.{AggregateFunctionDefinition, BuiltInFunctionDefinitions, TableFunctionDefinition}
-import org.apache.flink.table.operations.JoinQueryOperation.JoinType
-import org.apache.flink.table.operations.OperationExpressionsUtils.extractAggregationsAndProperties
-import org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType._
-import org.apache.flink.table.operations.utils.factories.AliasOperationUtils.createAliasList
-import org.apache.flink.table.operations.utils.factories._
-import org.apache.flink.table.types.logical.LogicalTypeRoot
-import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-import org.apache.flink.util.Preconditions
-
-import _root_.scala.collection.JavaConversions._
-import _root_.scala.collection.JavaConverters._
-
-/**
-  * Builder for [[[Operation]] tree.
-  *
-  * The operation tree builder resolves expressions such that factories only work with fully
-  * [[ResolvedExpression]]s.
-  */
-class OperationTreeBuilderImpl(
-    tableCatalog: TableReferenceLookup,
-    functionCatalog: FunctionLookup,
-    isStreaming: Boolean)
-  extends OperationTreeBuilder{
-
-  private val lookupResolver = new LookupCallResolver(functionCatalog)
-  private val projectionOperationFactory = new ProjectionOperationFactory()
-  private val sortOperationFactory = new SortOperationFactory(isStreaming)
-  private val calculatedTableFactory = new CalculatedTableFactory()
-  private val setOperationFactory = new SetOperationFactory(isStreaming)
-  private val aggregateOperationFactory = new AggregateOperationFactory(isStreaming)
-  private val joinOperationFactory = new JoinOperationFactory()
-
-  private val noWindowPropertyChecker = new NoWindowPropertyChecker(
-    "Window start and end properties are not available for Over windows.")
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-    project(projectList, child, explicitAlias = false)
-  }
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      explicitAlias: Boolean)
-    : QueryOperation = {
-    projectInternal(projectList, child, explicitAlias, Collections.emptyList())
-  }
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      overWindows: JList[OverWindow])
-    : QueryOperation = {
-
-    Preconditions.checkArgument(!overWindows.isEmpty)
-
-    projectList.asScala.map(_.accept(noWindowPropertyChecker))
-
-    projectInternal(projectList,
-      child,
-      explicitAlias = true,
-      overWindows)
-  }
-
-  private def projectInternal(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      explicitAlias: Boolean,
-      overWindows: JList[OverWindow])
-    : QueryOperation = {
-
-    validateProjectList(projectList, overWindows)
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .withOverWindows(overWindows)
-      .build
-    val projections = resolver.resolve(projectList)
-    projectionOperationFactory.create(
-      projections,
-      child,
-      explicitAlias,
-      resolver.postResolverFactory())
-  }
-
-  /**
-    * Window properties and aggregate function should not exist in the plain project, i.e., window
-    * properties should exist in the window operators and aggregate functions should exist in
-    * the aggregate operators.
-    */
-  private def validateProjectList(
-      projectList: JList[Expression],
-      overWindows: JList[OverWindow])
-    : Unit = {
-
-    val expressionsWithResolvedCalls = projectList.map(_.accept(lookupResolver)).asJava
-    val extracted = extractAggregationsAndProperties(expressionsWithResolvedCalls)
-    if (!extracted.getWindowProperties.isEmpty) {
-      throw new ValidationException("Window properties can only be used on windowed tables.")
-    }
-
-    // aggregate functions can't exist in the plain project except for the over window case
-    if (!extracted.getAggregations.isEmpty && overWindows.isEmpty) {
-      throw new ValidationException("Aggregate functions are not supported in the select right" +
-        " after the aggregate or flatAggregate operation.")
-    }
-  }
-
-  /**
-    * Adds additional columns. Existing fields will be replaced if replaceIfExist is true.
-    */
-  override def addColumns(
-      replaceIfExist: Boolean,
-      fieldLists: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-    val newColumns = if (replaceIfExist) {
-      val fieldNames = child.getTableSchema.getFieldNames.toList.asJava
-      ColumnOperationUtils.addOrReplaceColumns(fieldNames, fieldLists)
-    } else {
-      (unresolvedRef("*") +: fieldLists.asScala).asJava
-    }
-    project(newColumns, child)
-  }
-
-  override def renameColumns(
-      aliases: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .build
-
-    val inputFieldNames = child.getTableSchema.getFieldNames.toList.asJava
-    val validateAliases = ColumnOperationUtils.renameColumns(
-      inputFieldNames,
-      resolver.resolveExpanding(aliases))
-
-    project(validateAliases, child)
-  }
-
-  override def dropColumns(
-      fieldList: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .build
-
-    val inputFieldNames = child.getTableSchema.getFieldNames.toList.asJava
-    val finalFields = ColumnOperationUtils.dropFields(
-      inputFieldNames,
-      resolver.resolveExpanding(fieldList))
-
-    project(finalFields, child)
-  }
-
-  override def aggregate(
-      groupingExpressions: JList[Expression],
-      aggregates: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build
-
-    val resolvedGroupings = resolver.resolve(groupingExpressions)
-    val resolvedAggregates = resolver.resolve(aggregates)
-
-    aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child)
-  }
-
-  /**
-    * Row based aggregate that will flatten the output if it is a composite type.
-    */
-  override def aggregate(
-      groupingExpressions: JList[Expression],
-      aggregate: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-    val resolvedAggregate = aggregate.accept(lookupResolver)
-
-    // extract alias and aggregate function
-    var alias: Seq[String] = Seq()
-    val aggWithoutAlias = resolvedAggregate match {
-      case c: UnresolvedCallExpression
-          if c.getFunctionDefinition == BuiltInFunctionDefinitions.AS =>
-        alias = c.getChildren
-          .drop(1)
-          .map(e => ExpressionUtils.extractValue(e, classOf[String]).get())
-        c.getChildren.get(0)
-      case c: UnresolvedCallExpression
-        if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] =>
-        if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo(
-          c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1
-        c
-      case e => e
-    }
-
-    // turn agg to a named agg, because it will be verified later.
-    var cnt = 0
-    val childNames = child.getTableSchema.getFieldNames
-    while (childNames.contains("TMP_" + cnt)) {
-      cnt += 1
-    }
-    val aggWithNamedAlias = unresolvedCall(
-      BuiltInFunctionDefinitions.AS,
-      aggWithoutAlias,
-      valueLiteral("TMP_" + cnt))
-
-    // get agg table
-    val aggQueryOperation = this.aggregate(groupingExpressions, Seq(aggWithNamedAlias), child)
-
-    // flatten the aggregate function
-    val aggNames = aggQueryOperation.getTableSchema.getFieldNames
-    val flattenExpressions = aggNames.take(groupingExpressions.size())
-      .map(e => unresolvedRef(e)) ++
-      Seq(unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, unresolvedRef(aggNames.last)))
-    val flattenedOperation = this.project(flattenExpressions.toList, aggQueryOperation)
-
-    // add alias
-    aliasBackwardFields(flattenedOperation, alias, groupingExpressions.size())
-  }
-
-  override def tableAggregate(
-      groupingExpressions: JList[Expression],
-      tableAggFunction: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-
-    // Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    // groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
-    // table aggregate function in Step4.
-    val newGroupingExpressions = addAliasToTheCallInGroupings(
-      child.getTableSchema.getFieldNames,
-      groupingExpressions)
-
-    // Step2: resolve expressions
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build
-    val resolvedGroupings = resolver.resolve(newGroupingExpressions)
-    val resolvedFunctionAndAlias = aggregateOperationFactory.extractTableAggFunctionAndAliases(
-      resolveSingleExpression(tableAggFunction, resolver))
-
-    // Step3: create table agg operation
-    val tableAggOperation = aggregateOperationFactory
-      .createAggregate(resolvedGroupings, Seq(resolvedFunctionAndAlias.f0), child)
-
-    // Step4: add a top project to alias the output fields of the table aggregate.
-    aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size())
-  }
-
-  override def windowAggregate(
-      groupingExpressions: JList[Expression],
-      window: GroupWindow,
-      windowProperties: JList[Expression],
-      aggregates: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver)
-
-    val resolverWithWindowReferences = resolverFor(tableCatalog, functionCatalog, child)
-      .withLocalReferences(
-        new LocalReferenceExpression(
-          resolvedWindow.getAlias,
-          resolvedWindow.getTimeAttribute.getOutputDataType))
-      .build
-
-    val convertedGroupings = resolverWithWindowReferences.resolve(groupingExpressions)
-
-    val convertedAggregates = resolverWithWindowReferences.resolve(aggregates)
-
-    val convertedProperties = resolverWithWindowReferences.resolve(windowProperties)
-
-    aggregateOperationFactory.createWindowAggregate(
-      convertedGroupings,
-      convertedAggregates,
-      convertedProperties,
-      resolvedWindow,
-      child)
-  }
-
-  override def windowTableAggregate(
-    groupingExpressions: JList[Expression],
-    window: GroupWindow,
-    windowProperties: JList[Expression],
-    tableAggFunction: Expression,
-    child: QueryOperation)
-  : QueryOperation = {
-
-    // Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    // groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
-    // table aggregate function in Step4.
-    val newGroupingExpressions = addAliasToTheCallInGroupings(
-      child.getTableSchema.getFieldNames,
-      groupingExpressions)
-
-    // Step2: resolve expressions, including grouping, aggregates and window properties.
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver)
-
-    val resolverWithWindowReferences = resolverFor(tableCatalog, functionCatalog, child)
-      .withLocalReferences(
-        new LocalReferenceExpression(
-          resolvedWindow.getAlias,
-          resolvedWindow.getTimeAttribute.getOutputDataType))
-      .build
-
-    val convertedGroupings = resolverWithWindowReferences.resolve(newGroupingExpressions)
-    val convertedAggregates = resolverWithWindowReferences.resolve(Seq(tableAggFunction))
-    val convertedProperties = resolverWithWindowReferences.resolve(windowProperties)
-    val resolvedFunctionAndAlias = aggregateOperationFactory.extractTableAggFunctionAndAliases(
-      convertedAggregates.get(0))
-
-    // Step3: create window table agg operation
-    val tableAggOperation = aggregateOperationFactory.createWindowAggregate(
-      convertedGroupings,
-      Seq(resolvedFunctionAndAlias.f0),
-      convertedProperties,
-      resolvedWindow,
-      child)
-
-    // Step4: add a top project to alias the output fields of the table aggregate. Also, project the
-    // window attribute.
-    aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size())
-  }
-
-  override def join(
-      left: QueryOperation,
-      right: QueryOperation,
-      joinType: JoinType,
-      condition: Optional[Expression],
-      correlated: Boolean)
-    : QueryOperation = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, left, right).build()
-    val resolvedCondition = toScala(condition).map(expr => resolveSingleExpression(expr, resolver))
-
-    joinOperationFactory
-      .create(left, right, joinType, resolvedCondition.getOrElse(valueLiteral(true)), correlated)
-  }
-
-  override def joinLateral(
-      left: QueryOperation,
-      tableFunction: Expression,
-      joinType: JoinType,
-      condition: Optional[Expression])
-    : QueryOperation = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, left).build()
-    val resolvedFunction = resolveSingleExpression(tableFunction, resolver)
-
-    val temporalTable =
-      calculatedTableFactory.create(resolvedFunction, left.getTableSchema.getFieldNames)
-
-    join(left, temporalTable, joinType, condition, correlated = true)
-  }
-
-  override def resolveExpression(expression: Expression, queryOperation: QueryOperation*)
-    : Expression = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, queryOperation: _*).build()
-
-    resolveSingleExpression(expression, resolver)
-  }
-
-  private def resolveSingleExpression(
-      expression: Expression,
-      resolver: ExpressionResolver)
-    : ResolvedExpression = {
-    val resolvedExpression = resolver.resolve(List(expression).asJava)
-    if (resolvedExpression.size() != 1) {
-      throw new ValidationException("Expected single expression")
-    } else {
-      resolvedExpression.get(0)
-    }
-  }
-
-  override def sort(
-      fields: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedFields = resolver.resolve(fields)
-
-    sortOperationFactory.createSort(resolvedFields, child, resolver.postResolverFactory())
-  }
-
-  override def limitWithOffset(offset: Int, child: QueryOperation): QueryOperation = {
-    sortOperationFactory.createLimitWithOffset(offset, child)
-  }
-
-  override def limitWithFetch(fetch: Int, child: QueryOperation): QueryOperation = {
-    sortOperationFactory.createLimitWithFetch(fetch, child)
-  }
-
-  override def alias(
-      fields: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val newFields = createAliasList(fields, child)
-
-    project(newFields, child, explicitAlias = true)
-  }
-
-  override def filter(
-      condition: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedExpression = resolveSingleExpression(condition, resolver)
-    val conditionType = resolvedExpression.getOutputDataType
-    if (!LogicalTypeChecks.hasRoot(conditionType.getLogicalType, LogicalTypeRoot.BOOLEAN)) {
-      throw new ValidationException(s"Filter operator requires a boolean expression as input," +
-        s" but $condition is of type ${conditionType}")
-    }
-
-    new FilterQueryOperation(resolvedExpression, child)
-  }
-
-  override def distinct(
-      child: QueryOperation)
-    : QueryOperation = {
-    new DistinctQueryOperation(child)
-  }
-
-  override def minus(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(MINUS, left, right, all)
-  }
-
-  override def intersect(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(INTERSECT, left, right, all)
-  }
-
-  override def union(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(UNION, left, right, all)
-  }
-
-  override def map(mapFunction: Expression, child: QueryOperation): QueryOperation = {
-
-    val resolvedMapFunction = mapFunction.accept(lookupResolver)
-
-    if (!isFunctionOfKind(resolvedMapFunction, SCALAR)) {
-      throw new ValidationException("Only a scalar function can be used in the map operator.")
-    }
-
-    val expandedFields = unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, resolvedMapFunction)
-    project(Collections.singletonList(expandedFields), child)
-  }
-
-  override def flatMap(tableFunction: Expression, child: QueryOperation): QueryOperation = {
-
-    val resolvedTableFunction = tableFunction.accept(lookupResolver)
-
-    if (!isFunctionOfKind(resolvedTableFunction, TABLE)) {
-      throw new ValidationException("Only a table function can be used in the flatMap operator.")
-    }
-
-    val originFieldNames: Seq[String] =
-      resolvedTableFunction.asInstanceOf[UnresolvedCallExpression].getFunctionDefinition match {
-        case tfd: TableFunctionDefinition =>
-          UserDefinedFunctionUtils.getFieldInfo(tfd.getResultType)._1
-      }
-
-    val usedFieldNames = child.getTableSchema.getFieldNames.toBuffer
-    val newFieldNames = originFieldNames.map({ e =>
-      val resultName = getUniqueName(e, usedFieldNames)
-      usedFieldNames.append(resultName)
-      resultName
-    })
-
-    val renamedTableFunction = unresolvedCall(
-      BuiltInFunctionDefinitions.AS,
-      resolvedTableFunction +: newFieldNames.map(ApiExpressionUtils.valueLiteral(_)): _*)
-    val joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, Optional.empty())
-    val rightNode = dropColumns(
-      child.getTableSchema.getFieldNames.map(ApiExpressionUtils.unresolvedRef).toList,
-      joinNode)
-    alias(originFieldNames.map(a => unresolvedRef(a)), rightNode)
-  }
-
-  /**
-    * Return a unique name that does not exist in usedFieldNames according to the input name.
-    */
-  private def getUniqueName(inputName: String, usedFieldNames: Seq[String]): String = {
-    var i = 0
-    var resultName = inputName
-    while (usedFieldNames.contains(resultName)) {
-      resultName = resultName + "_" + i
-      i += 1
-    }
-    resultName
-  }
-
-  /**
-    * Add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    * groupBy(a % 5 as TMP_0).
-    */
-  private def addAliasToTheCallInGroupings(
-      inputFieldNames: Seq[String],
-      groupingExpressions: JList[Expression])
-    : JList[Expression] = {
-
-    var attrNameCntr: Int = 0
-    val usedFieldNames = inputFieldNames.toBuffer
-    groupingExpressions.map {
-      case c: UnresolvedCallExpression
-          if c.getFunctionDefinition != BuiltInFunctionDefinitions.AS =>
-        val tempName = getUniqueName("TMP_" + attrNameCntr, usedFieldNames)
-        usedFieldNames.append(tempName)
-        attrNameCntr += 1
-        unresolvedCall(
-          BuiltInFunctionDefinitions.AS,
-          c,
-          valueLiteral(tempName)
-        )
-      case e => e
-    }
-  }
-
-  /**
-    * Rename fields in the input [[QueryOperation]].
-    */
-  private def aliasBackwardFields(
-    inputOperation: QueryOperation,
-    alias: Seq[String],
-    aliasStartIndex: Int)
-  : QueryOperation = {
-
-    if (alias.nonEmpty) {
-      val namesBeforeAlias = inputOperation.getTableSchema.getFieldNames
-      val namesAfterAlias = namesBeforeAlias.take(aliasStartIndex) ++ alias ++
-        namesBeforeAlias.takeRight(namesBeforeAlias.length - alias.size - aliasStartIndex)
-      this.alias(namesAfterAlias.map(e => unresolvedRef(e)).toList, inputOperation)
-    } else {
-      inputOperation
-    }
-  }
-
-  class NoWindowPropertyChecker(val exceptionMessage: String)
-    extends ApiExpressionDefaultVisitor[Void] {
-    override def visit(unresolvedCall: UnresolvedCallExpression): Void = {
-      val functionDefinition = unresolvedCall.getFunctionDefinition
-      if (BuiltInFunctionDefinitions.WINDOW_PROPERTIES
-        .contains(functionDefinition)) {
-        throw new ValidationException(exceptionMessage)
-      }
-      unresolvedCall.getChildren.asScala.foreach(expr => expr.accept(this))
-      null
-    }
-
-    override protected def defaultMethod(expression: Expression): Void = null
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 44dc4ef..20e8ba6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -215,7 +215,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
       config,
       jStreamExecEnv,
       streamPlanner,
-      executor)
+      executor,
+      true)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index d435809..b56919d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -73,7 +73,8 @@ class AggregateTest extends TableTestBase {
       new TableConfig,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       Mockito.mock(classOf[Planner]),
-      Mockito.mock(classOf[Executor])
+      Mockito.mock(classOf[Executor]),
+      true
     )
 
     tablEnv.registerFunction("udag", new MyAgg)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e72225d..e85e7e4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -338,7 +338,8 @@ case class StreamTableTestUtil(
     tableConfig,
     javaEnv,
     streamPlanner,
-    executor)
+    executor,
+    true)
 
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -347,7 +348,8 @@ case class StreamTableTestUtil(
     tableConfig,
     env,
     streamPlanner,
-    executor)
+    executor,
+    true)
 
   def addTable[T: TypeInformation](
       name: String,