You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/02 15:46:38 UTC

[flink] branch master updated (d903480 -> 6cc8e44)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d903480  [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
     new 39e4ad2  [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java
     new 1d167a3  [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module
     new 6cc8e44  [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 +-
 .../flink/table/api/EnvironmentSettings.java       |   7 +
 .../table/api/internal/TableEnvironmentImpl.java   |  30 +-
 .../apache/flink/table/api/internal/TableImpl.java |   6 +-
 .../expressions/utils/ApiExpressionUtils.java      |  29 +-
 .../table/operations/OperationTreeBuilder.java     | 110 ----
 .../{ => utils}/OperationExpressionsUtils.java     |   7 +-
 .../operations/utils/OperationTreeBuilder.java     | 697 +++++++++++++++++++++
 .../{ => utils}/QueryOperationDefaultVisitor.java  |  15 +-
 .../factories}/AggregateOperationFactory.java      |   7 +-
 .../utils/factories}/AliasOperationUtils.java      |   3 +-
 .../utils/factories}/CalculatedTableFactory.java   |   4 +-
 .../utils/factories}/ColumnOperationUtils.java     |   4 +-
 .../utils/factories}/JoinOperationFactory.java     |   5 +-
 .../factories}/ProjectionOperationFactory.java     |   8 +-
 .../utils/factories}/SetOperationFactory.java      |   4 +-
 .../utils/factories}/SortOperationFactory.java     |   4 +-
 .../internal/StreamTableEnvironmentImpl.scala      |  10 +-
 .../flink/table/expressions/ExpressionUtils.java   |   8 +-
 .../flink/table/plan/QueryOperationConverter.java  |   2 +-
 .../operations/OperationTreeBuilderFactory.java    |  44 --
 .../flink/table/plan/QueryOperationConverter.java  |   2 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  18 +-
 .../operations/OperationTreeBuilderImpl.scala      | 599 ------------------
 .../api/stream/StreamTableEnvironmentTest.scala    |   3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   6 +-
 27 files changed, 817 insertions(+), 826 deletions(-)
 delete mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ => utils}/OperationExpressionsUtils.java (98%)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/{ => utils}/QueryOperationDefaultVisitor.java (73%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AggregateOperationFactory.java (98%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/AliasOperationUtils.java (97%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/CalculatedTableFactory.java (96%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ColumnOperationUtils.java (97%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/JoinOperationFactory.java (95%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/ProjectionOperationFactory.java (95%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SetOperationFactory.java (95%)
 rename flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/operations => flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories}/SortOperationFactory.java (96%)
 delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala


[flink] 02/03: [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d167a3670614901e4ef011af92b4045c7eb1612
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Sat Jun 1 19:08:56 2019 +0200

    [FLINK-12906][table-planner][table-api-java] Ported OperationTreeBuilder to table-api-java module
---
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 +-
 .../flink/table/api/EnvironmentSettings.java       |   7 +
 .../table/api/internal/TableEnvironmentImpl.java   |  28 +-
 .../expressions/utils/ApiExpressionUtils.java      |  29 +-
 .../operations/OperationExpressionsUtils.java      |   4 +-
 .../table/operations/OperationTreeBuilder.java     | 680 +++++++++++++++++++--
 .../internal/StreamTableEnvironmentImpl.scala      |  10 +-
 .../flink/table/expressions/ExpressionUtils.java   |   8 +-
 .../operations/OperationTreeBuilderFactory.java    |  44 --
 .../flink/table/api/internal/TableEnvImpl.scala    |  17 +-
 .../operations/OperationTreeBuilderImpl.scala      | 600 ------------------
 .../api/stream/StreamTableEnvironmentTest.scala    |   3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   6 +-
 14 files changed, 702 insertions(+), 745 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 6b37690..05815dd 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -85,8 +85,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			TableConfig tableConfig,
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
-			Executor executor) {
-		super(catalogManager, tableConfig, executor, functionCatalog, planner);
+			Executor executor,
+			boolean isStreaming) {
+		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreaming);
 		this.executionEnvironment = executionEnvironment;
 	}
 
@@ -119,7 +120,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			tableConfig,
 			executionEnvironment,
 			planner,
-			executor
+			executor,
+			!settings.isBatchMode()
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 37ba179..70b7ffd 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -114,6 +114,13 @@ public class EnvironmentSettings {
 		return builtInDatabaseName;
 	}
 
+	/**
+	 * Tells if the {@link TableEnvironment} should work in a batch or streaming mode.
+	 */
+	public boolean isBatchMode() {
+		return isBatchMode;
+	}
+
 	@Internal
 	public Map<String, String> toPlannerProperties() {
 		Map<String, String> properties = new HashMap<>(toCommonProperties());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 727727a..9b04f56 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -46,7 +45,6 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -60,7 +58,6 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.util.StringUtils;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -92,7 +89,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			TableConfig tableConfig,
 			Executor executor,
 			FunctionCatalog functionCatalog,
-			Planner planner) {
+			Planner planner,
+			boolean isStreaming) {
 		this.catalogManager = catalogManager;
 		this.execEnv = executor;
 
@@ -103,32 +101,16 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
-		this.operationTreeBuilder = lookupTreeBuilder(
+		this.operationTreeBuilder = OperationTreeBuilder.create(
+			functionCatalog,
 			path -> {
 				Optional<CatalogQueryOperation> catalogTableOperation = scanInternal(path);
 				return catalogTableOperation.map(tableOperation -> new TableReferenceExpression(path, tableOperation));
 			},
-			functionCatalog
+			isStreaming
 		);
 	}
 
-	private static OperationTreeBuilder lookupTreeBuilder(
-		TableReferenceLookup tableReferenceLookup,
-		FunctionLookup functionDefinitionCatalog) {
-		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.operations.OperationTreeBuilderFactory");
-			Method createMethod = clazz.getMethod(
-				"create",
-				TableReferenceLookup.class,
-				FunctionLookup.class);
-
-			return (OperationTreeBuilder) createMethod.invoke(null, tableReferenceLookup, functionDefinitionCatalog);
-		} catch (Exception e) {
-			throw new TableException(
-				"Could not instantiate the operation builder. Make sure the planner module is on the classpath");
-		}
-	}
-
 	@VisibleForTesting
 	public Planner getPlanner() {
 		return planner;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
index f04878f..5d0c2c3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/utils/ApiExpressionUtils.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.types.DataType;
@@ -116,18 +117,34 @@ public final class ApiExpressionUtils {
 	/**
 	 * Checks if the expression is a function call of given type.
 	 *
-	 * @param expr expression to check
+	 * @param expression expression to check
 	 * @param kind expected type of function
 	 * @return true if the expression is function call of given type, false otherwise
 	 */
-	public static boolean isFunctionOfKind(Expression expr, FunctionKind kind) {
-		if (expr instanceof UnresolvedCallExpression) {
-			return ((UnresolvedCallExpression) expr).getFunctionDefinition().getKind() == kind;
+	public static boolean isFunctionOfKind(Expression expression, FunctionKind kind) {
+		if (expression instanceof UnresolvedCallExpression) {
+			return ((UnresolvedCallExpression) expression).getFunctionDefinition().getKind() == kind;
 		}
-		if (expr instanceof CallExpression) {
-			return ((CallExpression) expr).getFunctionDefinition().getKind() == kind;
+		if (expression instanceof CallExpression) {
+			return ((CallExpression) expression).getFunctionDefinition().getKind() == kind;
 		}
 		return false;
+	}
 
+	/**
+	 * Checks if the given expression is a given builtin function.
+	 *
+	 * @param expression expression to check
+	 * @param functionDefinition expected function definition
+	 * @return true if the given expression is a given function call
+	 */
+	public static boolean isFunction(Expression expression, BuiltInFunctionDefinition functionDefinition) {
+		if (expression instanceof UnresolvedCallExpression) {
+			return ((UnresolvedCallExpression) expression).getFunctionDefinition() == functionDefinition;
+		}
+		if (expression instanceof CallExpression) {
+			return ((CallExpression) expression).getFunctionDefinition() == functionDefinition;
+		}
+		return false;
 	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
index 8e9912b..eb2030d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
@@ -155,8 +155,8 @@ public class OperationExpressionsUtils {
 		private final Map<Expression, String> properties;
 
 		private AggregationAndPropertiesReplacer(
-			Map<Expression, String> aggregates,
-			Map<Expression, String> properties) {
+				Map<Expression, String> aggregates,
+				Map<Expression, String> properties) {
 			this.aggregates = aggregates;
 			this.properties = properties;
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
index 50eb8d9..37de6da 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
@@ -19,92 +19,676 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.GroupWindow;
 import org.apache.flink.table.api.OverWindow;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.expressions.resolver.LookupCallResolver;
+import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
+import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
+import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
+import org.apache.flink.table.operations.utils.factories.AggregateOperationFactory;
+import org.apache.flink.table.operations.utils.factories.AliasOperationUtils;
+import org.apache.flink.table.operations.utils.factories.CalculatedTableFactory;
+import org.apache.flink.table.operations.utils.factories.ColumnOperationUtils;
+import org.apache.flink.table.operations.utils.factories.JoinOperationFactory;
+import org.apache.flink.table.operations.utils.factories.ProjectionOperationFactory;
+import org.apache.flink.table.operations.utils.factories.SetOperationFactory;
+import org.apache.flink.table.operations.utils.factories.SortOperationFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
+import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
+import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.INTERSECT;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.MINUS;
+import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType.UNION;
 
 /**
- * Builder for validated {@link QueryOperation}s.
- *
- * <p>TODO. This is a temporary solution. The actual implementation should be ported.
+ * A builder for constructing validated {@link QueryOperation}s.
  */
 @Internal
-public interface OperationTreeBuilder {
-	QueryOperation project(List<Expression> projectList, QueryOperation child);
+public final class OperationTreeBuilder {
+
+	private final FunctionLookup functionCatalog;
+	private final TableReferenceLookup tableReferenceLookup;
+	private final LookupCallResolver lookupResolver;
+
+	/**
+	 * Utility classes for constructing a validated operation of certain type.
+	 */
+	private final ProjectionOperationFactory projectionOperationFactory;
+	private final SortOperationFactory sortOperationFactory;
+	private final CalculatedTableFactory calculatedTableFactory;
+	private final SetOperationFactory setOperationFactory;
+	private final AggregateOperationFactory aggregateOperationFactory;
+	private final JoinOperationFactory joinOperationFactory;
+
+	private OperationTreeBuilder(
+			FunctionLookup functionLookup,
+			TableReferenceLookup tableReferenceLookup,
+			ProjectionOperationFactory projectionOperationFactory,
+			SortOperationFactory sortOperationFactory,
+			CalculatedTableFactory calculatedTableFactory,
+			SetOperationFactory setOperationFactory,
+			AggregateOperationFactory aggregateOperationFactory,
+			JoinOperationFactory joinOperationFactory) {
+		this.functionCatalog = functionLookup;
+		this.tableReferenceLookup = tableReferenceLookup;
+		this.projectionOperationFactory = projectionOperationFactory;
+		this.sortOperationFactory = sortOperationFactory;
+		this.calculatedTableFactory = calculatedTableFactory;
+		this.setOperationFactory = setOperationFactory;
+		this.aggregateOperationFactory = aggregateOperationFactory;
+		this.joinOperationFactory = joinOperationFactory;
+		this.lookupResolver = new LookupCallResolver(functionLookup);
+	}
+
+	public static OperationTreeBuilder create(
+			FunctionLookup functionCatalog,
+			TableReferenceLookup tableReferenceLookup,
+			boolean isStreaming) {
+		return new OperationTreeBuilder(
+			functionCatalog,
+			tableReferenceLookup,
+			new ProjectionOperationFactory(),
+			new SortOperationFactory(isStreaming),
+			new CalculatedTableFactory(),
+			new SetOperationFactory(isStreaming),
+			new AggregateOperationFactory(isStreaming),
+			new JoinOperationFactory()
+		);
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child) {
+		return project(projectList, child, false);
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child, boolean explicitAlias) {
+		projectList.forEach(p -> p.accept(new NoAggregateChecker(
+			"Aggregate functions are not supported in the select right after the aggregate" +
+				" or flatAggregate operation.")));
+		projectList.forEach(p -> p.accept(new NoWindowPropertyChecker(
+			"Window properties can only be used on windowed tables.")));
+		return projectInternal(projectList, child, explicitAlias, Collections.emptyList());
+	}
+
+	public QueryOperation project(List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows) {
+
+		Preconditions.checkArgument(!overWindows.isEmpty());
+
+		projectList.forEach(p -> p.accept(new NoWindowPropertyChecker(
+			"Window start and end properties are not available for Over windows.")));
+
+		return projectInternal(
+			projectList,
+			child,
+			true,
+			overWindows);
+	}
+
+	private QueryOperation projectInternal(
+			List<Expression> projectList,
+			QueryOperation child,
+			boolean explicitAlias,
+			List<OverWindow> overWindows) {
+
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child)
+			.withOverWindows(overWindows)
+			.build();
+		List<ResolvedExpression> projections = resolver.resolve(projectList);
+		return projectionOperationFactory.create(projections, child, explicitAlias, resolver.postResolverFactory());
+	}
+
+	/**
+	 * Adds additional columns. Existing fields will be replaced if replaceIfExist is true.
+	 */
+	public QueryOperation addColumns(boolean replaceIfExist, List<Expression> fieldLists, QueryOperation child) {
+		final List<Expression> newColumns;
+		if (replaceIfExist) {
+			String[] fieldNames = child.getTableSchema().getFieldNames();
+			newColumns = ColumnOperationUtils.addOrReplaceColumns(Arrays.asList(fieldNames), fieldLists);
+		} else {
+			newColumns = new ArrayList<>(fieldLists);
+			newColumns.add(0, new UnresolvedReferenceExpression("*"));
+		}
+		return project(newColumns, child, false);
+	}
+
+	public QueryOperation renameColumns(List<Expression> aliases, QueryOperation child) {
+
+		ExpressionResolver resolver = getResolver(child);
+		String[] inputFieldNames = child.getTableSchema().getFieldNames();
+		List<Expression> validateAliases = ColumnOperationUtils.renameColumns(
+			Arrays.asList(inputFieldNames),
+			resolver.resolveExpanding(aliases));
+
+		return project(validateAliases, child, false);
+	}
+
+	public QueryOperation dropColumns(List<Expression> fieldLists, QueryOperation child) {
+
+		ExpressionResolver resolver = getResolver(child);
+		String[] inputFieldNames = child.getTableSchema().getFieldNames();
+		List<Expression> finalFields = ColumnOperationUtils.dropFields(
+			Arrays.asList(inputFieldNames),
+			resolver.resolveExpanding(fieldLists));
+
+		return project(finalFields, child, false);
+	}
+
+	public QueryOperation aggregate(List<Expression> groupingExpressions, List<Expression> aggregates, QueryOperation child) {
 
-	QueryOperation project(List<Expression> projectList, QueryOperation child, boolean explicitAlias);
+		ExpressionResolver resolver = getResolver(child);
 
-	QueryOperation project(List<Expression> projectList, QueryOperation child, List<OverWindow> overWindows);
+		List<ResolvedExpression> resolvedGroupings = resolver.resolve(groupingExpressions);
+		List<ResolvedExpression> resolvedAggregates = resolver.resolve(aggregates);
 
-	QueryOperation windowAggregate(
+		return aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child);
+	}
+
+	public QueryOperation windowAggregate(
 		List<Expression> groupingExpressions,
 		GroupWindow window,
 		List<Expression> windowProperties,
 		List<Expression> aggregates,
-		QueryOperation child);
+		QueryOperation child) {
 
-	QueryOperation join(
-		QueryOperation left,
-		QueryOperation right,
-		JoinType joinType,
-		Optional<Expression> condition,
-		boolean correlated);
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver);
 
-	QueryOperation joinLateral(
-		QueryOperation left,
-		Expression tableFunction,
-		JoinType joinType,
-		Optional<Expression> condition);
+		ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor(
+				tableReferenceLookup,
+				functionCatalog,
+				child)
+			.withLocalReferences(
+				new LocalReferenceExpression(
+					resolvedWindow.getAlias(),
+					resolvedWindow.getTimeAttribute().getOutputDataType()))
+			.build();
 
-	Expression resolveExpression(Expression expression, QueryOperation... tableOperation);
+		List<ResolvedExpression> convertedGroupings = resolverWithWindowReferences.resolve(groupingExpressions);
+		List<ResolvedExpression> convertedAggregates = resolverWithWindowReferences.resolve(aggregates);
+		List<ResolvedExpression> convertedProperties = resolverWithWindowReferences.resolve(windowProperties);
 
-	QueryOperation sort(List<Expression> fields, QueryOperation child);
+		return aggregateOperationFactory.createWindowAggregate(
+			convertedGroupings,
+			convertedAggregates,
+			convertedProperties,
+			resolvedWindow,
+			child);
+	}
 
-	QueryOperation limitWithOffset(int offset, QueryOperation child);
+	public QueryOperation join(
+			QueryOperation left,
+			QueryOperation right,
+			JoinType joinType,
+			Optional<Expression> condition,
+			boolean correlated) {
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, left, right)
+			.build();
+		Optional<ResolvedExpression> resolvedCondition = condition.map(expr -> resolveSingleExpression(expr, resolver));
 
-	QueryOperation limitWithFetch(int fetch, QueryOperation child);
+		return joinOperationFactory.create(
+			left,
+			right,
+			joinType,
+			resolvedCondition.orElse(valueLiteral(true)),
+			correlated);
+	}
 
-	QueryOperation alias(List<Expression> fields, QueryOperation child);
+	public QueryOperation joinLateral(
+			QueryOperation left,
+			Expression tableFunction,
+			JoinType joinType,
+			Optional<Expression> condition) {
+		ExpressionResolver resolver = getResolver(left);
+		ResolvedExpression resolvedFunction = resolveSingleExpression(tableFunction, resolver);
 
-	QueryOperation filter(Expression condition, QueryOperation child);
+		QueryOperation temporalTable = calculatedTableFactory.create(
+			resolvedFunction,
+			left.getTableSchema().getFieldNames());
 
-	QueryOperation distinct(QueryOperation child);
+		return join(left, temporalTable, joinType, condition, true);
+	}
 
-	QueryOperation minus(QueryOperation left, QueryOperation right, boolean all);
+	public Expression resolveExpression(Expression expression, QueryOperation... tableOperation) {
+		ExpressionResolver resolver = ExpressionResolver.resolverFor(
+			tableReferenceLookup,
+			functionCatalog,
+			tableOperation).build();
 
-	QueryOperation intersect(QueryOperation left, QueryOperation right, boolean all);
+		return resolveSingleExpression(expression, resolver);
+	}
 
-	QueryOperation union(QueryOperation left, QueryOperation right, boolean all);
+	private ResolvedExpression resolveSingleExpression(Expression expression, ExpressionResolver resolver) {
+		List<ResolvedExpression> resolvedExpression = resolver.resolve(Collections.singletonList(expression));
+		if (resolvedExpression.size() != 1) {
+			throw new ValidationException("Expected single expression");
+		} else {
+			return resolvedExpression.get(0);
+		}
+	}
 
-	/* Extensions */
+	public QueryOperation sort(List<Expression> fields, QueryOperation child) {
 
-	QueryOperation addColumns(boolean replaceIfExist, List<Expression> fieldLists, QueryOperation child);
+		ExpressionResolver resolver = getResolver(child);
+		List<ResolvedExpression> resolvedFields = resolver.resolve(fields);
 
-	QueryOperation renameColumns(List<Expression> aliases, QueryOperation child);
+		return sortOperationFactory.createSort(resolvedFields, child, resolver.postResolverFactory());
+	}
 
-	QueryOperation dropColumns(List<Expression> fieldLists, QueryOperation child);
+	public QueryOperation limitWithOffset(int offset, QueryOperation child) {
+		return sortOperationFactory.createLimitWithOffset(offset, child);
+	}
 
-	QueryOperation aggregate(List<Expression> groupingExpressions, List<Expression> aggregates, QueryOperation child);
+	public QueryOperation limitWithFetch(int fetch, QueryOperation child) {
+		return sortOperationFactory.createLimitWithFetch(fetch, child);
+	}
 
-	QueryOperation map(Expression mapFunction, QueryOperation child);
+	public QueryOperation alias(List<Expression> fields, QueryOperation child) {
+		List<Expression> newFields = AliasOperationUtils.createAliasList(fields, child);
 
-	QueryOperation flatMap(Expression tableFunction, QueryOperation child);
+		return project(newFields, child, true);
+	}
 
-	QueryOperation aggregate(List<Expression> groupingExpressions, Expression aggregate, QueryOperation child);
+	public QueryOperation filter(Expression condition, QueryOperation child) {
 
-	QueryOperation tableAggregate(
-		List<Expression> groupingExpressions,
-		Expression tableAggFunction,
-		QueryOperation child);
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedExpression resolvedExpression = resolveSingleExpression(condition, resolver);
+		DataType conditionType = resolvedExpression.getOutputDataType();
+		if (!LogicalTypeChecks.hasRoot(conditionType.getLogicalType(), LogicalTypeRoot.BOOLEAN)) {
+			throw new ValidationException("Filter operator requires a boolean expression as input," +
+				" but $condition is of type " + conditionType);
+		}
 
-	QueryOperation windowTableAggregate(
-		List<Expression> groupingExpressions,
-		GroupWindow window,
-		List<Expression> windowProperties,
-		Expression tableAggFunction,
-		QueryOperation child);
+		return new FilterQueryOperation(resolvedExpression, child);
+	}
+
+	public QueryOperation distinct(QueryOperation child) {
+		return new DistinctQueryOperation(child);
+	}
+
+	public QueryOperation minus(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(MINUS, left, right, all);
+	}
+
+	public QueryOperation intersect(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(INTERSECT, left, right, all);
+	}
+
+	public QueryOperation union(QueryOperation left, QueryOperation right, boolean all) {
+		return setOperationFactory.create(UNION, left, right, all);
+	}
+
+	public QueryOperation map(Expression mapFunction, QueryOperation child) {
+
+		Expression resolvedMapFunction = mapFunction.accept(lookupResolver);
+
+		if (!ApiExpressionUtils.isFunctionOfKind(resolvedMapFunction, FunctionKind.SCALAR)) {
+			throw new ValidationException("Only a scalar function can be used in the map operator.");
+		}
+
+		Expression expandedFields = unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, resolvedMapFunction);
+		return project(Collections.singletonList(expandedFields), child, false);
+	}
+
+	public QueryOperation flatMap(Expression tableFunction, QueryOperation child) {
+
+		Expression resolvedTableFunction = tableFunction.accept(lookupResolver);
+
+		if (!ApiExpressionUtils.isFunctionOfKind(resolvedTableFunction, FunctionKind.TABLE)) {
+			throw new ValidationException("Only a table function can be used in the flatMap operator.");
+		}
+
+		TypeInformation<?> resultType = ((TableFunctionDefinition) ((UnresolvedCallExpression) resolvedTableFunction)
+			.getFunctionDefinition())
+			.getResultType();
+		List<String> originFieldNames = Arrays.asList(FieldInfoUtils.getFieldNames(resultType));
+
+		List<String> childFields = Arrays.asList(child.getTableSchema().getFieldNames());
+		Set<String> usedFieldNames = new HashSet<>(childFields);
+
+		List<Expression> args = new ArrayList<>();
+		for (String originFieldName : originFieldNames) {
+			String resultName = getUniqueName(originFieldName, usedFieldNames);
+			usedFieldNames.add(resultName);
+			args.add(valueLiteral(resultName));
+		}
+
+		args.add(0, resolvedTableFunction);
+		Expression renamedTableFunction = unresolvedCall(
+			BuiltInFunctionDefinitions.AS,
+			args.toArray(new Expression[0]));
+		QueryOperation joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, Optional.empty());
+		QueryOperation rightNode = dropColumns(
+			childFields.stream().map(UnresolvedReferenceExpression::new).collect(Collectors.toList()),
+			joinNode);
+		return alias(
+			originFieldNames.stream().map(UnresolvedReferenceExpression::new).collect(Collectors.toList()),
+			rightNode);
+	}
+
+	public QueryOperation aggregate(List<Expression> groupingExpressions, Expression aggregate, QueryOperation child) {
+		Expression resolvedAggregate = aggregate.accept(lookupResolver);
+		AggregateWithAlias aggregateWithAlias = resolvedAggregate.accept(new ExtractAliasAndAggregate());
+
+		// turn agg to a named agg, because it will be verified later.
+		String[] childNames = child.getTableSchema().getFieldNames();
+		Expression aggregateRenamed = addAliasToTheCallInGroupings(
+			Arrays.asList(childNames),
+			Collections.singletonList(aggregateWithAlias.aggregate)).get(0);
+
+		// get agg table
+		QueryOperation aggregateOperation = this.aggregate(
+			groupingExpressions,
+			Collections.singletonList(aggregateRenamed),
+			child);
+
+		// flatten the aggregate function
+		String[] aggNames = aggregateOperation.getTableSchema().getFieldNames();
+		List<Expression> flattenedExpressions = Arrays.asList(aggNames)
+			.subList(0, groupingExpressions.size())
+			.stream()
+			.map(ApiExpressionUtils::unresolvedRef)
+			.collect(Collectors.toCollection(ArrayList::new));
+
+		flattenedExpressions.add(unresolvedCall(
+			BuiltInFunctionDefinitions.FLATTEN,
+			unresolvedRef(aggNames[aggNames.length - 1])));
+
+		QueryOperation flattenedProjection = this.project(flattenedExpressions, aggregateOperation);
+
+		// add alias
+		return aliasBackwardFields(flattenedProjection, aggregateWithAlias.aliases, groupingExpressions.size());
+	}
+
+	private static class AggregateWithAlias {
+		private final UnresolvedCallExpression aggregate;
+		private final List<String> aliases;
+
+		private AggregateWithAlias(UnresolvedCallExpression aggregate, List<String> aliases) {
+			this.aggregate = aggregate;
+			this.aliases = aliases;
+		}
+	}
+
+	private static class ExtractAliasAndAggregate extends ApiExpressionDefaultVisitor<AggregateWithAlias> {
+		@Override
+		public AggregateWithAlias visit(UnresolvedCallExpression unresolvedCall) {
+			if (ApiExpressionUtils.isFunction(unresolvedCall, BuiltInFunctionDefinitions.AS)) {
+				Expression expression = unresolvedCall.getChildren().get(0);
+				if (expression instanceof UnresolvedCallExpression) {
+					List<String> aliases = extractAliases(unresolvedCall);
+
+					return getAggregate((UnresolvedCallExpression) expression, aliases)
+						.orElseGet(() -> defaultMethod(unresolvedCall));
+				} else {
+					return defaultMethod(unresolvedCall);
+				}
+			}
+
+			return getAggregate(unresolvedCall, Collections.emptyList()).orElseGet(() -> defaultMethod(unresolvedCall));
+		}
+
+		private List<String> extractAliases(UnresolvedCallExpression unresolvedCall) {
+			return unresolvedCall.getChildren()
+								.subList(1, unresolvedCall.getChildren().size())
+								.stream()
+								.map(ex -> ExpressionUtils.extractValue(ex, String.class)
+									.orElseThrow(() -> new TableException("Expected string literal as alias.")))
+								.collect(Collectors.toList());
+		}
+
+		private Optional<AggregateWithAlias> getAggregate(
+				UnresolvedCallExpression unresolvedCall,
+				List<String> aliases) {
+			FunctionDefinition functionDefinition = unresolvedCall.getFunctionDefinition();
+			if (ApiExpressionUtils.isFunctionOfKind(unresolvedCall, FunctionKind.AGGREGATE)) {
+				final List<String> fieldNames;
+				if (aliases.isEmpty()) {
+					if (functionDefinition instanceof AggregateFunctionDefinition) {
+						TypeInformation<?> resultTypeInfo = ((AggregateFunctionDefinition) functionDefinition)
+							.getResultTypeInfo();
+						fieldNames = Arrays.asList(FieldInfoUtils.getFieldNames(resultTypeInfo));
+					} else {
+						fieldNames = Collections.emptyList();
+					}
+				} else {
+					fieldNames = aliases;
+				}
+				return Optional.of(new AggregateWithAlias(unresolvedCall, fieldNames));
+			} else {
+				return Optional.empty();
+			}
+		}
+
+		@Override
+		protected AggregateWithAlias defaultMethod(Expression expression) {
+			throw new ValidationException("Aggregate function expected. Got: " + expression);
+		}
+	}
+
+	public QueryOperation tableAggregate(
+			List<Expression> groupingExpressions,
+			Expression tableAggFunction,
+			QueryOperation child) {
+
+		// Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+		// groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
+		// table aggregate function in Step4.
+		List<Expression> newGroupingExpressions = addAliasToTheCallInGroupings(
+			Arrays.asList(child.getTableSchema().getFieldNames()),
+			groupingExpressions);
+
+		// Step2: resolve expressions
+		ExpressionResolver resolver = getResolver(child);
+		List<ResolvedExpression> resolvedGroupings = resolver.resolve(newGroupingExpressions);
+		Tuple2<ResolvedExpression, List<String>> resolvedFunctionAndAlias =
+			aggregateOperationFactory.extractTableAggFunctionAndAliases(
+				resolveSingleExpression(tableAggFunction, resolver));
+
+		// Step3: create table agg operation
+		QueryOperation tableAggOperation = aggregateOperationFactory
+			.createAggregate(resolvedGroupings, Collections.singletonList(resolvedFunctionAndAlias.f0), child);
+
+		// Step4: add a top project to alias the output fields of the table aggregate.
+		return aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size());
+	}
+
+	public QueryOperation windowTableAggregate(
+			List<Expression> groupingExpressions,
+			GroupWindow window,
+			List<Expression> windowProperties,
+			Expression tableAggFunction,
+			QueryOperation child) {
+
+		// Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+		// groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
+		// table aggregate function in Step4.
+		List<Expression> newGroupingExpressions = addAliasToTheCallInGroupings(
+			Arrays.asList(child.getTableSchema().getFieldNames()),
+			groupingExpressions);
+
+		// Step2: resolve expressions, including grouping, aggregates and window properties.
+		ExpressionResolver resolver = getResolver(child);
+		ResolvedGroupWindow resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver);
+
+		ExpressionResolver resolverWithWindowReferences = ExpressionResolver.resolverFor(
+			tableReferenceLookup,
+			functionCatalog,
+			child)
+			.withLocalReferences(
+				new LocalReferenceExpression(
+					resolvedWindow.getAlias(),
+					resolvedWindow.getTimeAttribute().getOutputDataType()))
+			.build();
+
+		List<ResolvedExpression> convertedGroupings = resolverWithWindowReferences.resolve(newGroupingExpressions);
+		List<ResolvedExpression> convertedAggregates = resolverWithWindowReferences.resolve(Collections.singletonList(
+			tableAggFunction));
+		List<ResolvedExpression> convertedProperties = resolverWithWindowReferences.resolve(windowProperties);
+		Tuple2<ResolvedExpression, List<String>> resolvedFunctionAndAlias = aggregateOperationFactory
+			.extractTableAggFunctionAndAliases(convertedAggregates.get(0));
+
+		// Step3: create window table agg operation
+		QueryOperation tableAggOperation = aggregateOperationFactory.createWindowAggregate(
+			convertedGroupings,
+			Collections.singletonList(resolvedFunctionAndAlias.f0),
+			convertedProperties,
+			resolvedWindow,
+			child);
+
+		// Step4: add a top project to alias the output fields of the table aggregate. Also, project the
+		// window attribute.
+		return aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size());
+	}
+
+	/**
+	 * Rename fields in the input {@link QueryOperation}.
+	 */
+	private QueryOperation aliasBackwardFields(
+		QueryOperation inputOperation,
+		List<String> alias,
+		int aliasStartIndex) {
+
+		if (!alias.isEmpty()) {
+			String[] namesBeforeAlias = inputOperation.getTableSchema().getFieldNames();
+			List<String> namesAfterAlias = new ArrayList<>(Arrays.asList(namesBeforeAlias));
+			for (int i = 0; i < alias.size(); i++) {
+				int withOffset = aliasStartIndex + i;
+				namesAfterAlias.remove(withOffset);
+				namesAfterAlias.add(withOffset, alias.get(i));
+			}
+
+			return this.alias(namesAfterAlias.stream()
+				.map(UnresolvedReferenceExpression::new)
+				.collect(Collectors.toList()), inputOperation);
+		} else {
+			return inputOperation;
+		}
+	}
+
+	/**
+	 * Add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
+	 * groupBy(a % 5 as TMP_0).
+	 */
+	private List<Expression> addAliasToTheCallInGroupings(
+		List<String> inputFieldNames,
+		List<Expression> groupingExpressions) {
+
+		int attrNameCntr = 0;
+		Set<String> usedFieldNames = new HashSet<>(inputFieldNames);
+
+		List<Expression> result = new ArrayList<>();
+		for (Expression groupingExpression : groupingExpressions) {
+			if (groupingExpression instanceof UnresolvedCallExpression &&
+				!ApiExpressionUtils.isFunction(groupingExpression, BuiltInFunctionDefinitions.AS)) {
+				String tempName = getUniqueName("TMP_" + attrNameCntr, usedFieldNames);
+				attrNameCntr += 1;
+				usedFieldNames.add(tempName);
+				result.add(unresolvedCall(
+					BuiltInFunctionDefinitions.AS,
+					groupingExpression,
+					valueLiteral(tempName)));
+			} else {
+				result.add(groupingExpression);
+			}
+		}
+
+		return result;
+	}
+
+	/**
+	 * Return a unique name that does not exist in usedFieldNames according to the input name.
+	 */
+	private String getUniqueName(String inputName, Collection<String> usedFieldNames) {
+		int i = 0;
+		String resultName = inputName;
+		while (usedFieldNames.contains(resultName)) {
+			resultName = resultName + "_" + i;
+			i += 1;
+		}
+		return resultName;
+	}
+
+	private ExpressionResolver getResolver(QueryOperation child) {
+		return ExpressionResolver.resolverFor(tableReferenceLookup, functionCatalog, child).build();
+	}
+
+	private static class NoWindowPropertyChecker extends ApiExpressionDefaultVisitor<Void> {
+		private final String exceptionMessage;
+
+		private NoWindowPropertyChecker(String exceptionMessage) {
+			this.exceptionMessage = exceptionMessage;
+		}
+
+		@Override
+		public Void visit(UnresolvedCallExpression call) {
+			FunctionDefinition functionDefinition = call.getFunctionDefinition();
+			if (BuiltInFunctionDefinitions.WINDOW_PROPERTIES.contains(functionDefinition)) {
+				throw new ValidationException(exceptionMessage);
+			}
+			call.getChildren().forEach(expr -> expr.accept(this));
+			return null;
+		}
+
+		@Override
+		protected Void defaultMethod(Expression expression) {
+			return null;
+		}
+	}
+
+	private static class NoAggregateChecker extends ApiExpressionDefaultVisitor<Void> {
+		private final String exceptionMessage;
+
+		private NoAggregateChecker(String exceptionMessage) {
+			this.exceptionMessage = exceptionMessage;
+		}
+
+		@Override
+		public Void visit(UnresolvedCallExpression call) {
+			if (ApiExpressionUtils.isFunctionOfKind(call, FunctionKind.AGGREGATE)) {
+				throw new ValidationException(exceptionMessage);
+			}
+			call.getChildren().forEach(expr -> expr.accept(this));
+			return null;
+		}
+
+		@Override
+		protected Void defaultMethod(Expression expression) {
+			return null;
+		}
+	}
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 96b7403..990f043 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -55,13 +55,15 @@ class StreamTableEnvironmentImpl (
     config: TableConfig,
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
-    executor: Executor)
+    executor: Executor,
+    isStreaming: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
     config,
     executor,
     functionCatalog,
-    planner)
+    planner,
+    isStreaming)
   with org.apache.flink.table.api.scala.StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -262,7 +264,9 @@ object StreamTableEnvironmentImpl {
       tableConfig,
       executionEnvironment,
       planner,
-      executor)
+      executor,
+      !settings.isBatchMode
+    )
   }
 
   private def lookupExecutor(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
index 90aece5..951be99 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ExpressionUtils.java
@@ -32,14 +32,14 @@ public final class ExpressionUtils {
 	 * Extracts the value (excluding null) of a given class from an expression assuming it is a
 	 * {@link ValueLiteralExpression}.
 	 *
-	 * @param expr literal to extract the value from
+	 * @param expression literal to extract the value from
 	 * @param targetClass expected class to extract from the literal
 	 * @param <V> type of extracted value
 	 * @return extracted value or empty if could not extract value of given type
 	 */
-	public static <V> Optional<V> extractValue(Expression expr, Class<V> targetClass) {
-		if (expr instanceof ValueLiteralExpression) {
-			final ValueLiteralExpression valueLiteral = (ValueLiteralExpression) expr;
+	public static <V> Optional<V> extractValue(Expression expression, Class<V> targetClass) {
+		if (expression instanceof ValueLiteralExpression) {
+			final ValueLiteralExpression valueLiteral = (ValueLiteralExpression) expression;
 			return valueLiteral.getValueAs(targetClass);
 		}
 		return Optional.empty();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
deleted file mode 100644
index 1a01307..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/OperationTreeBuilderFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.FunctionLookup;
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup;
-
-/**
- * Temporary solution for looking up the {@link OperationTreeBuilder}. The tree builder
- * should be moved to api module once the type inference is in place.
- */
-@Internal
-public final class OperationTreeBuilderFactory {
-
-	public static OperationTreeBuilder create(
-			TableReferenceLookup tableReferenceLookup,
-			FunctionLookup functionLookup) {
-		return new OperationTreeBuilderImpl(
-			tableReferenceLookup,
-			functionLookup,
-			true
-		);
-	}
-
-	private OperationTreeBuilderFactory() {
-	}
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index efa29ed..0041141 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -18,12 +18,6 @@
 
 package org.apache.flink.table.api.internal
 
-import _root_.java.util.Optional
-
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.FrameworkConfig
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
@@ -40,6 +34,13 @@ import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.util.JavaScalaConversionUtil
 import org.apache.flink.util.StringUtils
 
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.tools.FrameworkConfig
+
+import _root_.java.util.Optional
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -73,9 +74,9 @@ abstract class TableEnvImpl(
     }
   }
 
-  private[flink] val operationTreeBuilder = new OperationTreeBuilderImpl(
-    tableLookup,
+  private[flink] val operationTreeBuilder = OperationTreeBuilder.create(
     functionCatalog,
+    tableLookup,
     !isBatch)
 
   protected val planningConfigurationBuilder: PlanningConfigurationBuilder =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
deleted file mode 100644
index 03a9bc3..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations
-
-import java.util.{Collections, Optional, List => JList}
-import org.apache.flink.table.api._
-import org.apache.flink.table.catalog.FunctionLookup
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.expressions.resolver.ExpressionResolver.resolverFor
-import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
-import org.apache.flink.table.expressions.resolver.{ExpressionResolver, LookupCallResolver}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.utils.{ApiExpressionDefaultVisitor, ApiExpressionUtils}
-import org.apache.flink.table.functions.FunctionKind.{SCALAR, TABLE}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.functions.{AggregateFunctionDefinition, BuiltInFunctionDefinitions, TableFunctionDefinition}
-import org.apache.flink.table.operations.JoinQueryOperation.JoinType
-import org.apache.flink.table.operations.OperationExpressionsUtils.extractAggregationsAndProperties
-import org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType._
-import org.apache.flink.table.operations.utils.factories.AliasOperationUtils.createAliasList
-import org.apache.flink.table.operations.utils.factories._
-import org.apache.flink.table.types.logical.LogicalTypeRoot
-import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-import org.apache.flink.util.Preconditions
-
-import _root_.scala.collection.JavaConversions._
-import _root_.scala.collection.JavaConverters._
-
-/**
-  * Builder for [[[Operation]] tree.
-  *
-  * The operation tree builder resolves expressions such that factories only work with fully
-  * [[ResolvedExpression]]s.
-  */
-class OperationTreeBuilderImpl(
-    tableCatalog: TableReferenceLookup,
-    functionCatalog: FunctionLookup,
-    isStreaming: Boolean)
-  extends OperationTreeBuilder{
-
-  private val lookupResolver = new LookupCallResolver(functionCatalog)
-  private val projectionOperationFactory = new ProjectionOperationFactory()
-  private val sortOperationFactory = new SortOperationFactory(isStreaming)
-  private val calculatedTableFactory = new CalculatedTableFactory()
-  private val setOperationFactory = new SetOperationFactory(isStreaming)
-  private val aggregateOperationFactory = new AggregateOperationFactory(isStreaming)
-  private val joinOperationFactory = new JoinOperationFactory()
-
-  private val noWindowPropertyChecker = new NoWindowPropertyChecker(
-    "Window start and end properties are not available for Over windows.")
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-    project(projectList, child, explicitAlias = false)
-  }
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      explicitAlias: Boolean)
-    : QueryOperation = {
-    projectInternal(projectList, child, explicitAlias, Collections.emptyList())
-  }
-
-  override def project(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      overWindows: JList[OverWindow])
-    : QueryOperation = {
-
-    Preconditions.checkArgument(!overWindows.isEmpty)
-
-    projectList.asScala.map(_.accept(noWindowPropertyChecker))
-
-    projectInternal(projectList,
-      child,
-      explicitAlias = true,
-      overWindows)
-  }
-
-  private def projectInternal(
-      projectList: JList[Expression],
-      child: QueryOperation,
-      explicitAlias: Boolean,
-      overWindows: JList[OverWindow])
-    : QueryOperation = {
-
-    validateProjectList(projectList, overWindows)
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .withOverWindows(overWindows)
-      .build
-    val projections = resolver.resolve(projectList)
-    projectionOperationFactory.create(
-      projections,
-      child,
-      explicitAlias,
-      resolver.postResolverFactory())
-  }
-
-  /**
-    * Window properties and aggregate function should not exist in the plain project, i.e., window
-    * properties should exist in the window operators and aggregate functions should exist in
-    * the aggregate operators.
-    */
-  private def validateProjectList(
-      projectList: JList[Expression],
-      overWindows: JList[OverWindow])
-    : Unit = {
-
-    val expressionsWithResolvedCalls = projectList.map(_.accept(lookupResolver)).asJava
-    val extracted = extractAggregationsAndProperties(expressionsWithResolvedCalls)
-    if (!extracted.getWindowProperties.isEmpty) {
-      throw new ValidationException("Window properties can only be used on windowed tables.")
-    }
-
-    // aggregate functions can't exist in the plain project except for the over window case
-    if (!extracted.getAggregations.isEmpty && overWindows.isEmpty) {
-      throw new ValidationException("Aggregate functions are not supported in the select right" +
-        " after the aggregate or flatAggregate operation.")
-    }
-  }
-
-  /**
-    * Adds additional columns. Existing fields will be replaced if replaceIfExist is true.
-    */
-  override def addColumns(
-      replaceIfExist: Boolean,
-      fieldLists: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-    val newColumns = if (replaceIfExist) {
-      val fieldNames = child.getTableSchema.getFieldNames.toList.asJava
-      ColumnOperationUtils.addOrReplaceColumns(fieldNames, fieldLists)
-    } else {
-      (unresolvedRef("*") +: fieldLists.asScala).asJava
-    }
-    project(newColumns, child)
-  }
-
-  override def renameColumns(
-      aliases: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .build
-
-    val inputFieldNames = child.getTableSchema.getFieldNames.toList.asJava
-    val validateAliases = ColumnOperationUtils.renameColumns(
-      inputFieldNames,
-      resolver.resolveExpanding(aliases))
-
-    project(validateAliases, child)
-  }
-
-  override def dropColumns(
-      fieldList: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child)
-      .build
-
-    val inputFieldNames = child.getTableSchema.getFieldNames.toList.asJava
-    val finalFields = ColumnOperationUtils.dropFields(
-      inputFieldNames,
-      resolver.resolveExpanding(fieldList))
-
-    project(finalFields, child)
-  }
-
-  override def aggregate(
-      groupingExpressions: JList[Expression],
-      aggregates: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build
-
-    val resolvedGroupings = resolver.resolve(groupingExpressions)
-    val resolvedAggregates = resolver.resolve(aggregates)
-
-    aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child)
-  }
-
-  /**
-    * Row based aggregate that will flatten the output if it is a composite type.
-    */
-  override def aggregate(
-      groupingExpressions: JList[Expression],
-      aggregate: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-    val resolvedAggregate = aggregate.accept(lookupResolver)
-
-    // extract alias and aggregate function
-    var alias: Seq[String] = Seq()
-    val aggWithoutAlias = resolvedAggregate match {
-      case c: UnresolvedCallExpression
-          if c.getFunctionDefinition == BuiltInFunctionDefinitions.AS =>
-        alias = c.getChildren
-          .drop(1)
-          .map(e => ExpressionUtils.extractValue(e, classOf[String]).get())
-        c.getChildren.get(0)
-      case c: UnresolvedCallExpression
-        if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] =>
-        if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo(
-          c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1
-        c
-      case e => e
-    }
-
-    // turn agg to a named agg, because it will be verified later.
-    var cnt = 0
-    val childNames = child.getTableSchema.getFieldNames
-    while (childNames.contains("TMP_" + cnt)) {
-      cnt += 1
-    }
-    val aggWithNamedAlias = unresolvedCall(
-      BuiltInFunctionDefinitions.AS,
-      aggWithoutAlias,
-      valueLiteral("TMP_" + cnt))
-
-    // get agg table
-    val aggQueryOperation = this.aggregate(groupingExpressions, Seq(aggWithNamedAlias), child)
-
-    // flatten the aggregate function
-    val aggNames = aggQueryOperation.getTableSchema.getFieldNames
-    val flattenExpressions = aggNames.take(groupingExpressions.size())
-      .map(e => unresolvedRef(e)) ++
-      Seq(unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, unresolvedRef(aggNames.last)))
-    val flattenedOperation = this.project(flattenExpressions.toList, aggQueryOperation)
-
-    // add alias
-    aliasBackwardFields(flattenedOperation, alias, groupingExpressions.size())
-  }
-
-  override def tableAggregate(
-      groupingExpressions: JList[Expression],
-      tableAggFunction: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-
-    // Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    // groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
-    // table aggregate function in Step4.
-    val newGroupingExpressions = addAliasToTheCallInGroupings(
-      child.getTableSchema.getFieldNames,
-      groupingExpressions)
-
-    // Step2: resolve expressions
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build
-    val resolvedGroupings = resolver.resolve(newGroupingExpressions)
-    val resolvedFunctionAndAlias = aggregateOperationFactory.extractTableAggFunctionAndAliases(
-      resolveSingleExpression(tableAggFunction, resolver))
-
-    // Step3: create table agg operation
-    val tableAggOperation = aggregateOperationFactory
-      .createAggregate(resolvedGroupings, Seq(resolvedFunctionAndAlias.f0), child)
-
-    // Step4: add a top project to alias the output fields of the table aggregate.
-    aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size())
-  }
-
-  override def windowAggregate(
-      groupingExpressions: JList[Expression],
-      window: GroupWindow,
-      windowProperties: JList[Expression],
-      aggregates: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver)
-
-    val resolverWithWindowReferences = resolverFor(tableCatalog, functionCatalog, child)
-      .withLocalReferences(
-        new LocalReferenceExpression(
-          resolvedWindow.getAlias,
-          resolvedWindow.getTimeAttribute.getOutputDataType))
-      .build
-
-    val convertedGroupings = resolverWithWindowReferences.resolve(groupingExpressions)
-
-    val convertedAggregates = resolverWithWindowReferences.resolve(aggregates)
-
-    val convertedProperties = resolverWithWindowReferences.resolve(windowProperties)
-
-    aggregateOperationFactory.createWindowAggregate(
-      convertedGroupings,
-      convertedAggregates,
-      convertedProperties,
-      resolvedWindow,
-      child)
-  }
-
-  override def windowTableAggregate(
-    groupingExpressions: JList[Expression],
-    window: GroupWindow,
-    windowProperties: JList[Expression],
-    tableAggFunction: Expression,
-    child: QueryOperation)
-  : QueryOperation = {
-
-    // Step1: add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    // groupBy(a % 5 as TMP_0). We need a name for every column so that to perform alias for the
-    // table aggregate function in Step4.
-    val newGroupingExpressions = addAliasToTheCallInGroupings(
-      child.getTableSchema.getFieldNames,
-      groupingExpressions)
-
-    // Step2: resolve expressions, including grouping, aggregates and window properties.
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedWindow = aggregateOperationFactory.createResolvedWindow(window, resolver)
-
-    val resolverWithWindowReferences = resolverFor(tableCatalog, functionCatalog, child)
-      .withLocalReferences(
-        new LocalReferenceExpression(
-          resolvedWindow.getAlias,
-          resolvedWindow.getTimeAttribute.getOutputDataType))
-      .build
-
-    val convertedGroupings = resolverWithWindowReferences.resolve(newGroupingExpressions)
-    val convertedAggregates = resolverWithWindowReferences.resolve(Seq(tableAggFunction))
-    val convertedProperties = resolverWithWindowReferences.resolve(windowProperties)
-    val resolvedFunctionAndAlias = aggregateOperationFactory.extractTableAggFunctionAndAliases(
-      convertedAggregates.get(0))
-
-    // Step3: create window table agg operation
-    val tableAggOperation = aggregateOperationFactory.createWindowAggregate(
-      convertedGroupings,
-      Seq(resolvedFunctionAndAlias.f0),
-      convertedProperties,
-      resolvedWindow,
-      child)
-
-    // Step4: add a top project to alias the output fields of the table aggregate. Also, project the
-    // window attribute.
-    aliasBackwardFields(tableAggOperation, resolvedFunctionAndAlias.f1, groupingExpressions.size())
-  }
-
-  override def join(
-      left: QueryOperation,
-      right: QueryOperation,
-      joinType: JoinType,
-      condition: Optional[Expression],
-      correlated: Boolean)
-    : QueryOperation = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, left, right).build()
-    val resolvedCondition = toScala(condition).map(expr => resolveSingleExpression(expr, resolver))
-
-    joinOperationFactory
-      .create(left, right, joinType, resolvedCondition.getOrElse(valueLiteral(true)), correlated)
-  }
-
-  override def joinLateral(
-      left: QueryOperation,
-      tableFunction: Expression,
-      joinType: JoinType,
-      condition: Optional[Expression])
-    : QueryOperation = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, left).build()
-    val resolvedFunction = resolveSingleExpression(tableFunction, resolver)
-
-    val temporalTable =
-      calculatedTableFactory.create(resolvedFunction, left.getTableSchema.getFieldNames)
-
-    join(left, temporalTable, joinType, condition, correlated = true)
-  }
-
-  override def resolveExpression(expression: Expression, queryOperation: QueryOperation*)
-    : Expression = {
-    val resolver = resolverFor(tableCatalog, functionCatalog, queryOperation: _*).build()
-
-    resolveSingleExpression(expression, resolver)
-  }
-
-  private def resolveSingleExpression(
-      expression: Expression,
-      resolver: ExpressionResolver)
-    : ResolvedExpression = {
-    val resolvedExpression = resolver.resolve(List(expression).asJava)
-    if (resolvedExpression.size() != 1) {
-      throw new ValidationException("Expected single expression")
-    } else {
-      resolvedExpression.get(0)
-    }
-  }
-
-  override def sort(
-      fields: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedFields = resolver.resolve(fields)
-
-    sortOperationFactory.createSort(resolvedFields, child, resolver.postResolverFactory())
-  }
-
-  override def limitWithOffset(offset: Int, child: QueryOperation): QueryOperation = {
-    sortOperationFactory.createLimitWithOffset(offset, child)
-  }
-
-  override def limitWithFetch(fetch: Int, child: QueryOperation): QueryOperation = {
-    sortOperationFactory.createLimitWithFetch(fetch, child)
-  }
-
-  override def alias(
-      fields: JList[Expression],
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val newFields = createAliasList(fields, child)
-
-    project(newFields, child, explicitAlias = true)
-  }
-
-  override def filter(
-      condition: Expression,
-      child: QueryOperation)
-    : QueryOperation = {
-
-    val resolver = resolverFor(tableCatalog, functionCatalog, child).build()
-    val resolvedExpression = resolveSingleExpression(condition, resolver)
-    val conditionType = resolvedExpression.getOutputDataType
-    if (!LogicalTypeChecks.hasRoot(conditionType.getLogicalType, LogicalTypeRoot.BOOLEAN)) {
-      throw new ValidationException(s"Filter operator requires a boolean expression as input," +
-        s" but $condition is of type ${conditionType}")
-    }
-
-    new FilterQueryOperation(resolvedExpression, child)
-  }
-
-  override def distinct(
-      child: QueryOperation)
-    : QueryOperation = {
-    new DistinctQueryOperation(child)
-  }
-
-  override def minus(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(MINUS, left, right, all)
-  }
-
-  override def intersect(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(INTERSECT, left, right, all)
-  }
-
-  override def union(
-      left: QueryOperation,
-      right: QueryOperation,
-      all: Boolean)
-    : QueryOperation = {
-    setOperationFactory.create(UNION, left, right, all)
-  }
-
-  override def map(mapFunction: Expression, child: QueryOperation): QueryOperation = {
-
-    val resolvedMapFunction = mapFunction.accept(lookupResolver)
-
-    if (!isFunctionOfKind(resolvedMapFunction, SCALAR)) {
-      throw new ValidationException("Only a scalar function can be used in the map operator.")
-    }
-
-    val expandedFields = unresolvedCall(BuiltInFunctionDefinitions.FLATTEN, resolvedMapFunction)
-    project(Collections.singletonList(expandedFields), child)
-  }
-
-  override def flatMap(tableFunction: Expression, child: QueryOperation): QueryOperation = {
-
-    val resolvedTableFunction = tableFunction.accept(lookupResolver)
-
-    if (!isFunctionOfKind(resolvedTableFunction, TABLE)) {
-      throw new ValidationException("Only a table function can be used in the flatMap operator.")
-    }
-
-    val originFieldNames: Seq[String] =
-      resolvedTableFunction.asInstanceOf[UnresolvedCallExpression].getFunctionDefinition match {
-        case tfd: TableFunctionDefinition =>
-          UserDefinedFunctionUtils.getFieldInfo(tfd.getResultType)._1
-      }
-
-    val usedFieldNames = child.getTableSchema.getFieldNames.toBuffer
-    val newFieldNames = originFieldNames.map({ e =>
-      val resultName = getUniqueName(e, usedFieldNames)
-      usedFieldNames.append(resultName)
-      resultName
-    })
-
-    val renamedTableFunction = unresolvedCall(
-      BuiltInFunctionDefinitions.AS,
-      resolvedTableFunction +: newFieldNames.map(ApiExpressionUtils.valueLiteral(_)): _*)
-    val joinNode = joinLateral(child, renamedTableFunction, JoinType.INNER, Optional.empty())
-    val rightNode = dropColumns(
-      child.getTableSchema.getFieldNames.map(ApiExpressionUtils.unresolvedRef).toList,
-      joinNode)
-    alias(originFieldNames.map(a => unresolvedRef(a)), rightNode)
-  }
-
-  /**
-    * Return a unique name that does not exist in usedFieldNames according to the input name.
-    */
-  private def getUniqueName(inputName: String, usedFieldNames: Seq[String]): String = {
-    var i = 0
-    var resultName = inputName
-    while (usedFieldNames.contains(resultName)) {
-      resultName = resultName + "_" + i
-      i += 1
-    }
-    resultName
-  }
-
-  /**
-    * Add a default name to the call in the grouping expressions, e.g., groupBy(a % 5) to
-    * groupBy(a % 5 as TMP_0).
-    */
-  private def addAliasToTheCallInGroupings(
-      inputFieldNames: Seq[String],
-      groupingExpressions: JList[Expression])
-    : JList[Expression] = {
-
-    var attrNameCntr: Int = 0
-    val usedFieldNames = inputFieldNames.toBuffer
-    groupingExpressions.map {
-      case c: UnresolvedCallExpression
-          if c.getFunctionDefinition != BuiltInFunctionDefinitions.AS =>
-        val tempName = getUniqueName("TMP_" + attrNameCntr, usedFieldNames)
-        usedFieldNames.append(tempName)
-        attrNameCntr += 1
-        unresolvedCall(
-          BuiltInFunctionDefinitions.AS,
-          c,
-          valueLiteral(tempName)
-        )
-      case e => e
-    }
-  }
-
-  /**
-    * Rename fields in the input [[QueryOperation]].
-    */
-  private def aliasBackwardFields(
-    inputOperation: QueryOperation,
-    alias: Seq[String],
-    aliasStartIndex: Int)
-  : QueryOperation = {
-
-    if (alias.nonEmpty) {
-      val namesBeforeAlias = inputOperation.getTableSchema.getFieldNames
-      val namesAfterAlias = namesBeforeAlias.take(aliasStartIndex) ++ alias ++
-        namesBeforeAlias.takeRight(namesBeforeAlias.length - alias.size - aliasStartIndex)
-      this.alias(namesAfterAlias.map(e => unresolvedRef(e)).toList, inputOperation)
-    } else {
-      inputOperation
-    }
-  }
-
-  class NoWindowPropertyChecker(val exceptionMessage: String)
-    extends ApiExpressionDefaultVisitor[Void] {
-    override def visit(unresolvedCall: UnresolvedCallExpression): Void = {
-      val functionDefinition = unresolvedCall.getFunctionDefinition
-      if (BuiltInFunctionDefinitions.WINDOW_PROPERTIES
-        .contains(functionDefinition)) {
-        throw new ValidationException(exceptionMessage)
-      }
-      unresolvedCall.getChildren.asScala.foreach(expr => expr.accept(this))
-      null
-    }
-
-    override protected def defaultMethod(expression: Expression): Void = null
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 44dc4ef..20e8ba6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -215,7 +215,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
       config,
       jStreamExecEnv,
       streamPlanner,
-      executor)
+      executor,
+      true)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index d435809..b56919d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -73,7 +73,8 @@ class AggregateTest extends TableTestBase {
       new TableConfig,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       Mockito.mock(classOf[Planner]),
-      Mockito.mock(classOf[Executor])
+      Mockito.mock(classOf[Executor]),
+      true
     )
 
     tablEnv.registerFunction("udag", new MyAgg)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e72225d..e85e7e4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -338,7 +338,8 @@ case class StreamTableTestUtil(
     tableConfig,
     javaEnv,
     streamPlanner,
-    executor)
+    executor,
+    true)
 
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -347,7 +348,8 @@ case class StreamTableTestUtil(
     tableConfig,
     env,
     streamPlanner,
-    executor)
+    executor,
+    true)
 
   def addTable[T: TypeInformation](
       name: String,


[flink] 03/03: [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cc8e44adfaab2d6e3ae4c1992b8fa73c4066c81
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jul 2 16:46:23 2019 +0200

    [hotfix][table-api-java] Moved QueryOperation utilities to o.a.f.t.operations.utils
    
    This closes #8860
---
 .../flink/table/api/internal/TableEnvironmentImpl.java    |  2 +-
 .../org/apache/flink/table/api/internal/TableImpl.java    |  6 +++---
 .../operations/{ => utils}/OperationExpressionsUtils.java |  3 ++-
 .../operations/{ => utils}/OperationTreeBuilder.java      |  5 ++++-
 .../{ => utils}/QueryOperationDefaultVisitor.java         | 15 ++++++++++++++-
 .../utils/factories/AggregateOperationFactory.java        |  2 +-
 .../operations/utils/factories/ColumnOperationUtils.java  |  2 +-
 .../utils/factories/ProjectionOperationFactory.java       |  4 ++--
 .../apache/flink/table/plan/QueryOperationConverter.java  |  2 +-
 .../apache/flink/table/plan/QueryOperationConverter.java  |  2 +-
 .../apache/flink/table/api/internal/TableEnvImpl.scala    |  1 +
 11 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9b04f56..e94c65a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -50,9 +50,9 @@ import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.OperationTreeBuilder;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 79f8502..8d33e10 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -40,10 +40,10 @@ import org.apache.flink.table.expressions.resolver.LookupCallResolver;
 import org.apache.flink.table.functions.TemporalTableFunction;
 import org.apache.flink.table.functions.TemporalTableFunctionImpl;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
-import org.apache.flink.table.operations.OperationExpressionsUtils;
-import org.apache.flink.table.operations.OperationExpressionsUtils.CategorizedExpressions;
-import org.apache.flink.table.operations.OperationTreeBuilder;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils;
+import org.apache.flink.table.operations.utils.OperationExpressionsUtils.CategorizedExpressions;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 
 import java.util.Arrays;
 import java.util.Collections;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
similarity index 98%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
index eb2030d..9bf5b59 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationExpressionsUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationExpressionsUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.expressions.CallExpression;
@@ -30,6 +30,7 @@ import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.operations.QueryOperation;
 
 import java.util.LinkedHashMap;
 import java.util.List;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
similarity index 99%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index 37de6da..03406de 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,7 +42,10 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.operations.DistinctQueryOperation;
+import org.apache.flink.table.operations.FilterQueryOperation;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
 import org.apache.flink.table.operations.utils.factories.AggregateOperationFactory;
 import org.apache.flink.table.operations.utils.factories.AliasOperationUtils;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperationDefaultVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/QueryOperationDefaultVisitor.java
similarity index 73%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperationDefaultVisitor.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/QueryOperationDefaultVisitor.java
index b1c2edc..30ce987 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperationDefaultVisitor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/QueryOperationDefaultVisitor.java
@@ -16,9 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.AggregateQueryOperation;
+import org.apache.flink.table.operations.CalculatedQueryOperation;
+import org.apache.flink.table.operations.CatalogQueryOperation;
+import org.apache.flink.table.operations.DistinctQueryOperation;
+import org.apache.flink.table.operations.FilterQueryOperation;
+import org.apache.flink.table.operations.JoinQueryOperation;
+import org.apache.flink.table.operations.ProjectQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.QueryOperationVisitor;
+import org.apache.flink.table.operations.SetQueryOperation;
+import org.apache.flink.table.operations.SortQueryOperation;
+import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.WindowAggregateQueryOperation;
 
 /**
  * A utility {@link QueryOperationVisitor} that calls
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
index 8fef793..42c0f03 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
@@ -73,9 +73,9 @@ import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.isFunc
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.TABLE_AGGREGATE;
-import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
 import static org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow.WindowType.SLIDE;
 import static org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow.WindowType.TUMBLE;
+import static org.apache.flink.table.operations.utils.OperationExpressionsUtils.extractName;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
index 12e12c99..c26e2fc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
 import static java.lang.String.format;
 import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedRef;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
-import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
+import static org.apache.flink.table.operations.utils.OperationExpressionsUtils.extractName;
 
 /**
  * Utility class for creating projection expressions from column operation.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
index fffceb1..3834be5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
@@ -48,8 +48,8 @@ import java.util.stream.IntStream;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.GET;
-import static org.apache.flink.table.operations.OperationExpressionsUtils.extractName;
-import static org.apache.flink.table.operations.OperationExpressionsUtils.extractNames;
+import static org.apache.flink.table.operations.utils.OperationExpressionsUtils.extractName;
+import static org.apache.flink.table.operations.utils.OperationExpressionsUtils.extractNames;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 0c35431..4cca7ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -31,13 +31,13 @@ import org.apache.flink.table.operations.JoinQueryOperation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ProjectQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.QueryOperationDefaultVisitor;
 import org.apache.flink.table.operations.QueryOperationVisitor;
 import org.apache.flink.table.operations.RichTableSourceQueryOperation;
 import org.apache.flink.table.operations.SetQueryOperation;
 import org.apache.flink.table.operations.SortQueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.WindowAggregateQueryOperation;
+import org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor;
 import org.apache.flink.table.plan.schema.DataStreamTable;
 import org.apache.flink.table.plan.schema.FlinkRelOptTable;
 import org.apache.flink.table.plan.schema.TableSourceTable;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 798e83e..9acfae1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -52,7 +52,6 @@ import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ProjectQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.QueryOperationDefaultVisitor;
 import org.apache.flink.table.operations.QueryOperationVisitor;
 import org.apache.flink.table.operations.ScalaDataStreamQueryOperation;
 import org.apache.flink.table.operations.SetQueryOperation;
@@ -60,6 +59,7 @@ import org.apache.flink.table.operations.SortQueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.WindowAggregateQueryOperation;
 import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
+import org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor;
 import org.apache.flink.table.plan.logical.LogicalWindow;
 import org.apache.flink.table.plan.logical.SessionGroupWindow;
 import org.apache.flink.table.plan.logical.SlidingGroupWindow;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0041141..2d37173 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
+import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
 import org.apache.flink.table.planner.PlanningConfigurationBuilder
 import org.apache.flink.table.sinks.{TableSink, TableSinkUtils}


[flink] 01/03: [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39e4ad2097506f646a1c8a78f753628ad1debb82
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jul 2 14:55:29 2019 +0200

    [hotfix][table-planner][table-api-java] Move QueryOperation factories to table-api-java
---
 .../operations/utils/factories}/AggregateOperationFactory.java     | 5 ++++-
 .../table/operations/utils/factories}/AliasOperationUtils.java     | 3 ++-
 .../table/operations/utils/factories}/CalculatedTableFactory.java  | 4 +++-
 .../table/operations/utils/factories}/ColumnOperationUtils.java    | 2 +-
 .../table/operations/utils/factories}/JoinOperationFactory.java    | 5 ++++-
 .../operations/utils/factories}/ProjectionOperationFactory.java    | 4 +++-
 .../table/operations/utils/factories}/SetOperationFactory.java     | 4 +++-
 .../table/operations/utils/factories}/SortOperationFactory.java    | 4 +++-
 .../apache/flink/table/operations/OperationTreeBuilderImpl.scala   | 7 ++++---
 9 files changed, 27 insertions(+), 11 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
similarity index 98%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
index ac6ee30..8fef793 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AggregateOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -45,6 +45,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionRequirement;
 import org.apache.flink.table.functions.TableAggregateFunctionDefinition;
+import org.apache.flink.table.operations.AggregateQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.WindowAggregateQueryOperation;
 import org.apache.flink.table.operations.WindowAggregateQueryOperation.ResolvedGroupWindow;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
index c73ca47..a9a45d0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AliasOperationUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableSchema;
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.operations.QueryOperation;
 
 import java.util.List;
 import java.util.stream.Collectors;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
index 2e10ee8..fe01128 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +29,8 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.operations.CalculatedQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.typeutils.FieldInfoUtils;
 
 import java.util.Arrays;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
similarity index 98%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
index b84d33a..12e12c99 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ColumnOperationUtils.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ColumnOperationUtils.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/JoinOperationFactory.java
similarity index 95%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/JoinOperationFactory.java
index 5816c00..8c4f98a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/JoinOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/JoinOperationFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
@@ -26,7 +26,10 @@ import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.operations.CalculatedQueryOperation;
+import org.apache.flink.table.operations.JoinQueryOperation;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
similarity index 97%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
index 56a34ff..fffceb1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/ProjectionOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/ProjectionOperationFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
@@ -33,6 +33,8 @@ import org.apache.flink.table.expressions.resolver.ExpressionResolver;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.operations.ProjectQueryOperation;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SetOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
similarity index 95%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SetOperationFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
index bc791c6..9b54b78 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SetOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
@@ -16,12 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.SetQueryOperation;
 import org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType;
 
 import java.util.stream.IntStream;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
index e2d86e8..765fe4f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations;
+package org.apache.flink.table.operations.utils.factories;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
@@ -24,6 +24,8 @@ import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.resolver.ExpressionResolver;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.SortQueryOperation;
 
 import java.util.List;
 import java.util.stream.Collectors;
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
index 6b60c38..03a9bc3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilderImpl.scala
@@ -21,19 +21,20 @@ package org.apache.flink.table.operations
 import java.util.{Collections, Optional, List => JList}
 import org.apache.flink.table.api._
 import org.apache.flink.table.catalog.FunctionLookup
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
-import org.apache.flink.table.expressions.resolver.ExpressionResolver.resolverFor
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.expressions.resolver.ExpressionResolver.resolverFor
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.expressions.resolver.{ExpressionResolver, LookupCallResolver}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{isFunctionOfKind, unresolvedCall, unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.utils.{ApiExpressionDefaultVisitor, ApiExpressionUtils}
 import org.apache.flink.table.functions.FunctionKind.{SCALAR, TABLE}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.{AggregateFunctionDefinition, BuiltInFunctionDefinitions, TableFunctionDefinition}
-import org.apache.flink.table.operations.AliasOperationUtils.createAliasList
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType
 import org.apache.flink.table.operations.OperationExpressionsUtils.extractAggregationsAndProperties
 import org.apache.flink.table.operations.SetQueryOperation.SetQueryOperationType._
+import org.apache.flink.table.operations.utils.factories.AliasOperationUtils.createAliasList
+import org.apache.flink.table.operations.utils.factories._
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
 import org.apache.flink.table.util.JavaScalaConversionUtil.toScala