You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/09 13:52:53 UTC

[flink] 01/04: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 655545472d87e7c7d19bbc910cee45870d2b3888
Author: godfreyhe <go...@163.com>
AuthorDate: Tue May 26 17:08:56 2020 +0800

    [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api
    
    This closes #12335
---
 .../table/catalog/hive/HiveCatalogITCase.java      |  32 +++++-
 .../table/client/gateway/local/LocalExecutor.java  |   6 ++
 .../api/internal/CatalogTableSchemaResolver.java   | 118 +++++++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |   7 +-
 .../apache/flink/table/catalog/CatalogManager.java |  34 +++++-
 .../flink/table/catalog/CatalogTableImpl.java      |   8 ++
 .../org/apache/flink/table/delegation/Parser.java  |  12 +++
 .../org/apache/flink/table/utils/ParserMock.java   |   7 ++
 .../planner/catalog/CatalogCalciteSchema.java      |   9 +-
 .../catalog/CatalogManagerCalciteSchema.java       |   8 +-
 .../table/planner/catalog/CatalogSchemaTable.java  |  11 +-
 .../planner/catalog/DatabaseCalciteSchema.java     |   7 --
 .../flink/table/planner/delegation/ParserImpl.java |  22 +++-
 .../table/planner/delegation/PlannerBase.scala     |  32 +++---
 .../table/planner/sources/TableSourceUtil.scala    |  99 ++++++-----------
 .../operations/SqlToOperationConverterTest.java    |  26 +++--
 .../plan/FlinkCalciteCatalogReaderTest.java        |  17 +--
 .../flink/table/planner/utils/PlannerMocks.java    |  38 ++++---
 .../planner/plan/stream/sql/TableSourceTest.xml    | 104 +++++++++---------
 .../plan/stream/table/LegacyTableSourceTest.xml    |  72 ++++++++++---
 .../planner/plan/stream/table/TableSourceTest.xml  | 118 ++++++++++++++-------
 .../flink/table/api/TableEnvironmentTest.scala     |   2 +-
 .../codegen/WatermarkGeneratorCodeGenTest.scala    |  30 +++++-
 .../planner/plan/stream/sql/TableSourceTest.scala  |  48 ++-------
 .../plan/stream/table/LegacyTableSourceTest.scala  |   3 +
 .../plan/stream/table/TableSourceTest.scala        |  47 +++-----
 .../org/apache/flink/table/planner/ParserImpl.java |   7 ++
 .../flink/table/api/internal/TableEnvImpl.scala    |   2 +
 .../flink/table/catalog/CatalogManagerTest.java    |   3 +
 .../table/catalog/DatabaseCalciteSchemaTest.java   |  11 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |   3 +
 31 files changed, 601 insertions(+), 342 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index f6fa581..ba12079 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -68,6 +68,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -281,6 +282,32 @@ public class HiveCatalogITCase {
 	}
 
 	private void testReadWriteCsvWithProctime(boolean isStreaming) {
+		TableEnvironment tableEnv = prepareTable(isStreaming);
+		ArrayList<Row> rows = Lists.newArrayList(
+				tableEnv.executeSql("SELECT * FROM proctime_src").collect());
+		Assert.assertEquals(5, rows.size());
+		tableEnv.executeSql("DROP TABLE proctime_src");
+	}
+
+	@Test
+	public void testTableApiWithProctimeForBatch() {
+		testTableApiWithProctime(false);
+	}
+
+	@Test
+	public void testTableApiWithProctimeForStreaming() {
+		testTableApiWithProctime(true);
+	}
+
+	private void testTableApiWithProctime(boolean isStreaming) {
+		TableEnvironment tableEnv = prepareTable(isStreaming);
+		ArrayList<Row> rows = Lists.newArrayList(
+				tableEnv.from("proctime_src").select($("price"), $("ts"), $("l_proctime")).execute().collect());
+		Assert.assertEquals(5, rows.size());
+		tableEnv.executeSql("DROP TABLE proctime_src");
+	}
+
+	private TableEnvironment prepareTable(boolean isStreaming) {
 		EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().useBlinkPlanner();
 		if (isStreaming) {
 			builder = builder.inStreamingMode();
@@ -308,10 +335,7 @@ public class HiveCatalogITCase {
 						"'connector.path' = 'file://%s'," +
 						"'format.type' = 'csv')", srcPath));
 
-		ArrayList<Row> rows = Lists.newArrayList(
-				tableEnv.executeSql("SELECT * FROM proctime_src").collect());
-		Assert.assertEquals(5, rows.size());
-		tableEnv.executeSql("DROP TABLE proctime_src");
+		return tableEnv;
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 604734e..5caa2a9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
@@ -473,6 +474,11 @@ public class LocalExecutor implements Executor {
 			public UnresolvedIdentifier parseIdentifier(String identifier) {
 				return context.wrapClassLoader(() -> parser.parseIdentifier(identifier));
 			}
+
+			@Override
+			public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+				return context.wrapClassLoader(() -> parser.parseSqlExpression(sqlExpression, inputSchema));
+			}
 		};
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
new file mode 100644
index 0000000..3e1e08a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	private final Parser parser;
+	// A flag to indicate the table environment should work in a batch or streaming
+	// TODO remove this once FLINK-18180 is finished
+	private final boolean isStreamingMode;
+
+	public CatalogTableSchemaResolver(Parser parser, boolean isStreamingMode) {
+		this.parser = parser;
+		this.isStreamingMode = isStreamingMode;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = tableSchema.getFieldDataTypes();
+
+		TableSchema.Builder builder = TableSchema.builder();
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			DataType fieldType = fieldTypes[i];
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("Watermark can not be defined for a processing time attribute column.");
+				}
+				TimestampType originalType = (TimestampType) fieldType.getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldType = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {
+				TimestampType originalType = (TimestampType) fieldType.getLogicalType();
+				LogicalType rowtimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.ROWTIME,
+						originalType.getPrecision());
+				fieldType = TypeConversions.fromLogicalToDataType(rowtimeType);
+			}
+			if (tableColumn.isGenerated()) {
+				builder.field(fieldNames[i], fieldType, tableColumn.getExpr().get());
+			} else {
+				builder.field(fieldNames[i], fieldType);
+			}
+		}
+
+		tableSchema.getWatermarkSpecs().forEach(builder::watermark);
+		tableSchema.getPrimaryKey().ifPresent(
+				pk -> builder.primaryKey(pk.getName(), pk.getColumns().toArray(new String[0])));
+		return builder.build();
+	}
+
+	private boolean isProctimeType(String expr, TableSchema tableSchema) {
+		ResolvedExpression resolvedExpr = parser.parseSqlExpression(expr, tableSchema);
+		if (resolvedExpr == null) {
+			return false;
+		}
+		LogicalType type = resolvedExpr.getOutputDataType().getLogicalType();
+		return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.PROCTIME;
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 3d48655..6edce4a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -199,6 +199,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			Planner planner,
 			boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
+		this.catalogManager.setCatalogTableSchemaResolver(
+				new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
 		this.moduleManager = moduleManager;
 		this.execEnv = executor;
 
@@ -490,8 +492,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			return new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema());
+		});
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 44dc6c3..9c1e1d3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -23,12 +23,18 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.CatalogNotExistException;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -70,6 +76,8 @@ public final class CatalogManager {
 
 	private String currentDatabaseName;
 
+	private CatalogTableSchemaResolver schemaResolver;
+
 	// The name of the built-in catalog
 	private final String builtInCatalogName;
 
@@ -147,6 +155,16 @@ public final class CatalogManager {
 	}
 
 	/**
+	 * We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a
+	 * {@link Planner}.  At the same time {@link Planner} needs a {@link CatalogManager} to
+	 * be constructed. Thus we can't get {@link Parser} instance when creating a
+	 * {@link CatalogManager}. See {@link TableEnvironmentImpl#create}.
+	 */
+	public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {
+		this.schemaResolver = schemaResolver;
+	}
+
+	/**
 	 * Returns a factory for creating fully resolved data types that can be used for planning.
 	 */
 	public DataTypeFactory getDataTypeFactory() {
@@ -336,12 +354,24 @@ public final class CatalogManager {
 	 * @return table that the path points to.
 	 */
 	public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
+		Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
 		CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
 		if (temporaryTable != null) {
-			return Optional.of(TableLookupResult.temporary(temporaryTable));
+			return Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable)));
 		} else {
-			return getPermanentTable(objectIdentifier);
+			Optional<TableLookupResult> result = getPermanentTable(objectIdentifier);
+			return result.map(tableLookupResult ->
+					TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable())));
+		}
+	}
+
+	private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
+		if (!(table instanceof CatalogTableImpl)) {
+			return table;
 		}
+		CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table;
+		TableSchema newTableSchema = schemaResolver.resolve(catalogTableImpl.getSchema());
+		return catalogTableImpl.copy(newTableSchema);
 	}
 
 	/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 9ac9c5c..dfe5d8b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -88,6 +88,14 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 		return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, getComment());
 	}
 
+	public CatalogTable copy(TableSchema tableSchema) {
+		return new CatalogTableImpl(
+				tableSchema.copy(),
+				new ArrayList<>(getPartitionKeys()),
+				new HashMap<>(getProperties()),
+				getComment());
+	}
+
 	/**
 	 * Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
 	 */
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
index c1df477..a94c152 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 
@@ -54,4 +56,14 @@ public interface Parser {
 	 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
 	 */
 	UnresolvedIdentifier parseIdentifier(String identifier);
+
+	/**
+	 * Entry point for parsing SQL expressions expressed as a String.
+	 *
+	 * @param sqlExpression the SQL expression to parse
+	 * @param inputSchema the schema of the fields in sql expression
+	 * @return resolved expression
+	 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the sql expression
+	 */
+	ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
index 7507b00..4ec00e8 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ParserMock.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.table.utils;
 
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 
 import java.util.List;
@@ -37,4 +39,9 @@ public class ParserMock implements Parser {
 	public UnresolvedIdentifier parseIdentifier(String identifier) {
 		return UnresolvedIdentifier.of(identifier);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		return null;
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
index a216097..7331326 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogCalciteSchema.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.schema.Schema;
@@ -43,18 +42,13 @@ public class CatalogCalciteSchema extends FlinkSchema {
 	private final CatalogManager catalogManager;
 	// Flag that tells if the current planner should work in a batch or streaming mode.
 	private final boolean isStreamingMode;
-	// The SQL expression converter factory is used to derive correct result type of computed column,
-	// because the date type of computed column from catalog table is not trusted.
-	private final SqlExprToRexConverterFactory converterFactory;
 
 	public CatalogCalciteSchema(
 			String catalogName,
 			CatalogManager catalog,
-			SqlExprToRexConverterFactory converterFactory,
 			boolean isStreamingMode) {
 		this.catalogName = catalogName;
 		this.catalogManager = catalog;
-		this.converterFactory = converterFactory;
 		this.isStreamingMode = isStreamingMode;
 	}
 
@@ -67,8 +61,7 @@ public class CatalogCalciteSchema extends FlinkSchema {
 	@Override
 	public Schema getSubSchema(String schemaName) {
 		if (catalogManager.schemaExists(catalogName, schemaName)) {
-			return new DatabaseCalciteSchema(
-					schemaName, catalogName, catalogManager, converterFactory, isStreamingMode);
+			return new DatabaseCalciteSchema(schemaName, catalogName, catalogManager, isStreamingMode);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
index 84e7671..a0dd7c7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogManagerCalciteSchema.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.schema.Schema;
@@ -45,17 +44,12 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
 	private final CatalogManager catalogManager;
 	// Flag that tells if the current planner should work in a batch or streaming mode.
 	private final boolean isStreamingMode;
-	// The SQL expression converter factory is used to derive correct result type of computed column,
-	// because the date type of computed column from catalog table is not trusted.
-	private final SqlExprToRexConverterFactory converterFactory;
 
 	public CatalogManagerCalciteSchema(
 			CatalogManager catalogManager,
-			SqlExprToRexConverterFactory converterFactory,
 			boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
 		this.isStreamingMode = isStreamingMode;
-		this.converterFactory = converterFactory;
 	}
 
 	@Override
@@ -71,7 +65,7 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
 	@Override
 	public Schema getSubSchema(String name) {
 		if (catalogManager.schemaExists(name)) {
-			return new CatalogCalciteSchema(name, catalogManager, converterFactory, isStreamingMode);
+			return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
 		} else {
 			return null;
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index e0b2217..23a8d80 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.sources.TableSourceUtil;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -70,7 +69,6 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
 	private final ObjectIdentifier tableIdentifier;
 	private final CatalogBaseTable catalogBaseTable;
 	private final FlinkStatistic statistic;
-	private final SqlExprToRexConverterFactory converterFactory;
 	private final boolean isStreamingMode;
 	private final boolean isTemporary;
 	private final Catalog catalog;
@@ -84,9 +82,6 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
 	 * @param catalogBaseTable CatalogBaseTable instance which exists in the catalog
 	 * @param statistic Table statistics
 	 * @param catalog The catalog which the schema table belongs to
-	 * @param converterFactory The SQL expression converter factory is used to derive correct result
-	 *                         type of computed column, because the date type of computed column
-	 *                         from catalog table is not trusted.
 	 * @param isStreaming If the table is for streaming mode
 	 * @param isTemporary If the table is temporary
 	 */
@@ -95,14 +90,12 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
 			CatalogBaseTable catalogBaseTable,
 			FlinkStatistic statistic,
 			Catalog catalog,
-			SqlExprToRexConverterFactory converterFactory,
 			boolean isStreaming,
 			boolean isTemporary) {
 		this.tableIdentifier = tableIdentifier;
 		this.catalogBaseTable = catalogBaseTable;
 		this.statistic = statistic;
 		this.catalog = catalog;
-		this.converterFactory = converterFactory;
 		this.isStreamingMode = isStreaming;
 		this.isTemporary = isTemporary;
 	}
@@ -186,10 +179,10 @@ public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
 			}
 		}
 
-		return TableSourceUtil.getSourceRowTypeFromSchema(
-				converterFactory,
+		return TableSourceUtil.getSourceRowType(
 				flinkTypeFactory,
 				tableSchema,
+				scala.Option.empty(),
 				isStreamingMode);
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index 51102af..66ee213 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.plan.stats.TableStats;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
 import org.apache.calcite.linq4j.tree.Expression;
@@ -56,9 +55,6 @@ class DatabaseCalciteSchema extends FlinkSchema {
 	private final String databaseName;
 	private final String catalogName;
 	private final CatalogManager catalogManager;
-	// The SQL expression converter factory is used to derive correct result type of computed column,
-	// because the date type of computed column from catalog table is not trusted.
-	private final SqlExprToRexConverterFactory converterFactory;
 	// Flag that tells if the current planner should work in a batch or streaming mode.
 	private final boolean isStreamingMode;
 
@@ -66,12 +62,10 @@ class DatabaseCalciteSchema extends FlinkSchema {
 			String databaseName,
 			String catalogName,
 			CatalogManager catalog,
-			SqlExprToRexConverterFactory converterFactory,
 			boolean isStreamingMode) {
 		this.databaseName = databaseName;
 		this.catalogName = catalogName;
 		this.catalogManager = catalog;
-		this.converterFactory = converterFactory;
 		this.isStreamingMode = isStreamingMode;
 	}
 
@@ -87,7 +81,6 @@ class DatabaseCalciteSchema extends FlinkSchema {
 					table,
 					statistic,
 					catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
-					converterFactory,
 					isStreamingMode,
 					result.isTemporary());
 			})
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
index 466587e..91723f7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
@@ -19,19 +19,28 @@
 package org.apache.flink.table.planner.delegation;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.calcite.CalciteParser;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
 import org.apache.flink.table.planner.operations.SqlToOperationConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
 
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -46,14 +55,17 @@ public class ParserImpl implements Parser {
 	// multiple statements parsing
 	private final Supplier<FlinkPlannerImpl> validatorSupplier;
 	private final Supplier<CalciteParser> calciteParserSupplier;
+	private final Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator;
 
 	public ParserImpl(
 			CatalogManager catalogManager,
 			Supplier<FlinkPlannerImpl> validatorSupplier,
-			Supplier<CalciteParser> calciteParserSupplier) {
+			Supplier<CalciteParser> calciteParserSupplier,
+			Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator) {
 		this.catalogManager = catalogManager;
 		this.validatorSupplier = validatorSupplier;
 		this.calciteParserSupplier = calciteParserSupplier;
+		this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator;
 	}
 
 	@Override
@@ -74,4 +86,12 @@ public class ParserImpl implements Parser {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
+		RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
+		LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
+		return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 2fe0001..451f4d7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException, TableSchema}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
@@ -31,7 +31,7 @@ import org.apache.flink.table.factories.{FactoryUtil, TableFactoryUtil}
 import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.JMap
-import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite._
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
 import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
@@ -55,7 +55,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.tools.FrameworkConfig
 
 import java.util
-import java.util.function.{Supplier => JSupplier}
+import java.util.function.{Function => JFunction, Supplier => JSupplier}
 
 import _root_.scala.collection.JavaConversions._
 
@@ -89,17 +89,6 @@ abstract class PlannerBase(
       plannerContext.createSqlExprToRexConverter(tableRowType)
   }
 
-  @VisibleForTesting
-  private[flink] val plannerContext: PlannerContext =
-    new PlannerContext(
-      config,
-      functionCatalog,
-      catalogManager,
-      asRootSchema(new CatalogManagerCalciteSchema(
-        catalogManager, sqlExprToRexConverterFactory, isStreamingMode)),
-      getTraitDefs.toList
-    )
-
   private val parser: Parser = new ParserImpl(
     catalogManager,
     new JSupplier[FlinkPlannerImpl] {
@@ -110,9 +99,24 @@ abstract class PlannerBase(
     // parsing statements
     new JSupplier[CalciteParser] {
       override def get(): CalciteParser = plannerContext.createCalciteParser()
+    },
+    new JFunction[TableSchema, SqlExprToRexConverter] {
+      override def apply(t: TableSchema): SqlExprToRexConverter = {
+        sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
+      }
     }
   )
 
+  @VisibleForTesting
+  private[flink] val plannerContext: PlannerContext =
+    new PlannerContext(
+      config,
+      functionCatalog,
+      catalogManager,
+      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+      getTraitDefs.toList
+    )
+
   /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
   private[flink] def getRelBuilder: FlinkRelBuilder = {
     val currentCatalogName = catalogManager.getCurrentCatalog
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index 9d8880e..39eb5b6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -18,14 +18,12 @@
 
 package org.apache.flink.table.planner.sources
 
-import org.apache.flink.table.api.{DataTypes, TableColumn, TableSchema, ValidationException, WatermarkSpec}
+import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException, WatermarkSpec}
 import org.apache.flink.table.expressions.ApiExpressionUtils.{typeLiteral, valueLiteral}
 import org.apache.flink.table.expressions.{CallExpression, Expression, ResolvedExpression, ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.types.DataTypePrecisionFixer
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -39,7 +37,7 @@ import org.apache.calcite.plan.RelOptCluster
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
 import _root_.java.sql.Timestamp
@@ -119,67 +117,6 @@ object TableSourceUtil {
   }
 
   /**
-    * Returns schema of the selected fields of the given [[TableSchema]].
-    *
-    * @param converterFactory converter to convert computed columns.
-    * @param typeFactory Type factory to create the type
-    * @param tableSchema Table schema to derive table field names and data types
-    * @param streaming Flag to determine whether the schema of a stream or batch table is created
-    * @return The row type for the selected fields of the given [[TableSchema]], this type would
-    *         also be patched with time attributes defined in the give [[WatermarkSpec]]
-    */
-  def getSourceRowTypeFromSchema(
-      converterFactory: SqlExprToRexConverterFactory,
-      typeFactory: FlinkTypeFactory,
-      tableSchema: TableSchema,
-      streaming: Boolean): RelDataType = {
-    val fieldNames = tableSchema.getFieldNames
-    var fieldTypes = tableSchema.getFieldDataTypes.map(fromDataTypeToLogicalType)
-
-    // TODO: [FLINK-14473] we only support top-level rowtime attribute right now
-    val rowTimeIdx = if (tableSchema.getWatermarkSpecs.nonEmpty) {
-      val rowtime = tableSchema.getWatermarkSpecs.head.getRowtimeAttribute
-      if (rowtime.contains(".")) {
-        throw new ValidationException(
-          s"Nested field '$rowtime' as rowtime attribute is not supported right now.")
-      }
-      Some(fieldNames.indexOf(rowtime))
-    } else {
-      None
-    }
-
-    val converter = converterFactory.create(
-      typeFactory.buildRelNodeRowType(fieldNames, fieldTypes))
-    def isProctime(column: TableColumn): Boolean = {
-      toScala(column.getExpr).exists { expr =>
-        converter.convertToRexNode(expr) match {
-          case call: RexCall => call.getOperator == FlinkSqlOperatorTable.PROCTIME
-          case _ => false
-        }
-      }
-    }
-
-    fieldTypes = fieldTypes.zipWithIndex.map {
-      case (originalType, i) =>
-        if (streaming && rowTimeIdx.exists(_.equals(i))) {
-          new TimestampType(
-            originalType.isNullable,
-            TimestampKind.ROWTIME,
-            originalType.asInstanceOf[TimestampType].getPrecision)
-        } else if (isProctime(tableSchema.getTableColumn(i).get())) {
-          new TimestampType(
-            originalType.isNullable,
-            TimestampKind.PROCTIME,
-            originalType.asInstanceOf[TimestampType].getPrecision)
-        } else {
-          originalType
-        }
-    }
-
-    typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
-  }
-
-  /**
     * Returns schema of the selected fields of the given [[TableSource]].
     *
     * <p> The watermark strategy specifications should either come from the [[TableSchema]]
@@ -224,6 +161,36 @@ object TableSourceUtil {
   }
 
   /**
+   * Returns schema of the selected fields of the given [[TableSource]].
+   *
+   * <p> The watermark strategy specifications should either come from the [[TableSchema]]
+   * or [[TableSource]].
+   *
+   * @param typeFactory Type factory to create the type
+   * @param tableSchema Table schema to derive table field names and data types
+   * @param tableSource Table source to derive watermark strategies
+   * @param streaming Flag to determine whether the schema of a stream or batch table is created
+   * @return The row type for the selected fields of the given [[TableSource]], this type would
+   *         also be patched with time attributes defined in the give [[WatermarkSpec]]
+   */
+  def getSourceRowType(
+      typeFactory: FlinkTypeFactory,
+      tableSchema: TableSchema,
+      tableSource: Option[TableSource[_]],
+      streaming: Boolean): RelDataType = {
+
+    val fieldNames = tableSchema.getFieldNames
+    val fieldDataTypes = tableSchema.getFieldDataTypes
+
+    if (tableSource.isDefined) {
+      getSourceRowTypeFromSource(typeFactory, tableSource.get, streaming)
+    } else {
+      val fieldTypes = fieldDataTypes.map(fromDataTypeToLogicalType)
+      typeFactory.buildRelNodeRowType(fieldNames, fieldTypes)
+    }
+  }
+
+  /**
     * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
     *
     * @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index e9244ff..1c582a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
@@ -41,6 +42,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -56,9 +58,8 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.planner.calcite.CalciteParser;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.expressions.utils.Func0$;
 import org.apache.flink.table.planner.expressions.utils.Func1$;
@@ -67,7 +68,6 @@ import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctio
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlNode;
 import org.junit.After;
 import org.junit.Before;
@@ -84,6 +84,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
@@ -102,6 +103,7 @@ import static org.junit.Assert.assertThat;
  * Test cases for {@link SqlToOperationConverter}.
  */
 public class SqlToOperationConverterTest {
+	private final boolean isStreamingMode = false;
 	private final TableConfig tableConfig = new TableConfig();
 	private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog",
 		"default");
@@ -113,16 +115,25 @@ public class SqlToOperationConverterTest {
 		tableConfig,
 		catalogManager,
 		moduleManager);
-	private SqlExprToRexConverterFactory sqlExprToRexConverterFactory = this::createSqlExprToRexConverter;
+	private final Supplier<FlinkPlannerImpl> plannerSupplier =
+		() -> getPlannerContext().createFlinkPlanner(
+				catalogManager.getCurrentCatalog(),
+				catalogManager.getCurrentDatabase());
+	private final Parser parser = new ParserImpl(
+		catalogManager,
+		plannerSupplier,
+		() -> plannerSupplier.get().parser(),
+		t -> getPlannerContext().createSqlExprToRexConverter(
+				getPlannerContext().getTypeFactory().buildRelNodeRowType(t)));
 	private final PlannerContext plannerContext =
 		new PlannerContext(tableConfig,
 			functionCatalog,
 			catalogManager,
-			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, sqlExprToRexConverterFactory, false)),
+			asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
 			new ArrayList<>());
 
-	private SqlExprToRexConverter createSqlExprToRexConverter(RelDataType t) {
-		return plannerContext.createSqlExprToRexConverter(t);
+	private PlannerContext getPlannerContext() {
+		return plannerContext;
 	}
 
 	@Rule
@@ -130,6 +141,7 @@ public class SqlToOperationConverterTest {
 
 	@Before
 	public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+		catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, isStreamingMode));
 		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
 		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
 		final TableSchema tableSchema = TableSchema.builder()
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
index 296e7ce..3ec52507 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable;
 import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
@@ -34,7 +33,6 @@ import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.junit.Before;
@@ -76,22 +74,9 @@ public class FlinkCalciteCatalogReaderTest {
 		// Mock CatalogSchemaTable.
 		CatalogSchemaTable mockTable = new CatalogSchemaTable(
 			ObjectIdentifier.of("a", "b", "c"),
-			ConnectorCatalogTable.source(
-				new TestTableSource(true, TableSchema.builder().build()),
-				true),
+			ConnectorCatalogTable.source(new TestTableSource(true, TableSchema.builder().build()), true),
 			FlinkStatistic.UNKNOWN(),
 			null,
-			tableRowType -> new SqlExprToRexConverter() {
-				@Override
-				public RexNode convertToRexNode(String expr) {
-					return null;
-				}
-
-				@Override
-				public RexNode[] convertToRexNodes(String[] exprs) {
-					return new RexNode[0];
-				}
-			},
 			true,
 			false);
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index e3f15b1..7252ff4 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -19,16 +19,18 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 
@@ -37,25 +39,31 @@ import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
  */
 public class PlannerMocks {
 	public static FlinkPlannerImpl createDefaultPlanner() {
+		final boolean isStreamingMode = false;
 		TableConfig tableConfig = new TableConfig();
 		CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
 		ModuleManager moduleManager = new ModuleManager();
 		FunctionCatalog functionCatalog = new FunctionCatalog(
-			tableConfig,
-			catalogManager,
-			moduleManager);
-		AtomicReference<PlannerContext> reference = new AtomicReference<>();
+				tableConfig,
+				catalogManager,
+				moduleManager);
 		PlannerContext plannerContext = new PlannerContext(
-			tableConfig,
-			functionCatalog,
-			catalogManager,
-			asRootSchema(new CatalogManagerCalciteSchema(
-					catalogManager, t -> reference.get().createSqlExprToRexConverter(t), false)),
-			new ArrayList<>());
-		reference.set(plannerContext);
-		return plannerContext.createFlinkPlanner(
-			catalogManager.getCurrentCatalog(),
-			catalogManager.getCurrentDatabase());
+				tableConfig,
+				functionCatalog,
+				catalogManager,
+				asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
+				new ArrayList<>());
+		FlinkPlannerImpl planner = plannerContext.createFlinkPlanner(
+				catalogManager.getCurrentCatalog(),
+				catalogManager.getCurrentDatabase());
+		Parser parser = new ParserImpl(
+				catalogManager,
+				() -> planner,
+				planner::parser,
+				t -> plannerContext.createSqlExprToRexConverter(plannerContext.getTypeFactory().buildRelNodeRowType(t))
+		);
+		catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, isStreamingMode));
+		return planner;
 	}
 
 	private PlannerMocks() {
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 965fc3e..efe7f47 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -40,24 +40,24 @@ Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedVa
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProcTimeTableSourceSimple">
+  <TestCase name="testProjectOnlyRowtime">
     <Resource name="sql">
-      <![CDATA[SELECT pTime, id, name, val FROM procTimeT]]>
+      <![CDATA[SELECT rtime FROM T]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(pTime=[$3], id=[$0], name=[$2], val=[$1])
-+- LogicalWatermarkAssigner(rowtime=[pTime], watermark=[$3])
-   +- LogicalProject(id=[$0], val=[$1], name=[$2], pTime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rtime=[$1])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[pTime, id, name, val])
-+- WatermarkAssigner(rowtime=[pTime], watermark=[pTime])
-   +- Calc(select=[id, val, name, PROCTIME() AS pTime])
-      +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rtime])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
 ]]>
     </Resource>
   </TestCase>
@@ -80,48 +80,6 @@ GroupAggregate(select=[COUNT(*) AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithoutRowtime">
-    <Resource name="sql">
-      <![CDATA[SELECT ptime, name, val, id FROM T]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
-   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
-   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
-      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testProjectWithRowtimeProctime">
-    <Resource name="sql">
-      <![CDATA[SELECT name, val, id FROM T]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[ptime], watermark=[$4])
-   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[name, val, id])
-+- WatermarkAssigner(rowtime=[ptime], watermark=[ptime])
-   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
-      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testRowTimeTableSourceGroupWindow">
     <Resource name="sql">
       <![CDATA[
@@ -153,6 +111,27 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProjectWithoutProctime">
+    <Resource name="sql">
+      <![CDATA[select name, val, rtime, id from T]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, val, rtime, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTableSourceWithTimestampRowTimeField">
     <Resource name="sql">
       <![CDATA[SELECT rowtime, id, name, val FROM rowTimeT]]>
@@ -172,4 +151,25 @@ Calc(select=[rowtime, id, name, val])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProjectWithoutRowtime">
+    <Resource name="sql">
+      <![CDATA[SELECT ptime, name, val, id FROM T]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
index 8090ffb..a0b9a14 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.xml
@@ -61,45 +61,45 @@ Calc(select=[PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, val])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithMapping">
+  <TestCase name="testProjectOnlyProctime">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(name=[$4], rtime=[$1], val=[$2])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]])
+LogicalProject(ptime=[$3])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[name, rtime, val])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithoutRowtime">
+  <TestCase name="testProjectOnlyRowtime">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(ptime=[$3], name=[$4], val=[$2], id=[$0])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+LogicalProject(rtime=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[rtime])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithRowtimeProctime">
+  <TestCase name="testProjectWithMapping">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(name=[$4], val=[$2], id=[$0])
-+- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+LogicalProject(name=[$4], rtime=[$1], val=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[name, val, id])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+Calc(select=[name, rtime, val])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: p-rtime, p-id, p-name, p-val)]]], fields=[id, rtime, val, ptime, name])
 ]]>
     </Resource>
   </TestCase>
@@ -122,6 +122,34 @@ Calc(select=[name, EXPR$0, EXPR$1])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProjectWithoutProctime">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$4], val=[$2], rtime=[$1], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, val, rtime, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, rtime, val, name)]]], fields=[id, rtime, val, ptime, name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithRowtimeProctime">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$4], val=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, val, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTableSourceWithTimestampRowTimeField">
     <Resource name="planBefore">
       <![CDATA[
@@ -136,6 +164,20 @@ Calc(select=[rowtime, id, name, val])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProjectWithoutRowtime">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(ptime=[$3], name=[$4], val=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestSource(physical fields: id, name, val, rtime)]]], fields=[id, rtime, val, ptime, name])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTableSourceWithLongRowTimeField">
     <Resource name="planBefore">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index b6dea7e..36c09f5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -16,58 +16,81 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testTableSourceWithTimestampRowTimeField">
+  <TestCase name="testNestedProject">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(rowtime=[$1], id=[$0], name=[$3], val=[$2])
-+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], nestedFlag=[$1.nested2.flag], nestedNum=[$1.nested2.num])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[rowtime, id, name, val])
-+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
-   +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedValue, deepNested.nested2.flag AS nestedFlag, deepNested.nested2.num AS nestedNum])
++- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested, nested]]], fields=[id, deepNested, nested])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testNestedProject">
+  <TestCase name="testProcTimeTableSourceOverWindow">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], nestedFlag=[$1.nested2.flag], nestedNum=[$1.nested2.num])
-+- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalFilter(condition=[>($2, 100)])
++- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[id, deepNested.nested1.name AS nestedName, nested.value AS nestedValue, deepNested.nested2.flag AS nestedFlag, deepNested.nested2.num AS nestedNum])
-+- TableSourceScan(table=[[default_catalog, default_database, T, project=[id, deepNested, nested]]], fields=[id, deepNested, nested])
+Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
+   +- Exchange(distribution=[hash[id]])
+      +- Calc(select=[id, val, name, PROCTIME() AS $3])
+         +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProcTimeTableSourceSimple">
+  <TestCase name="testProjectOnlyRowtime">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(proctime=[$3], id=[$0], name=[$2], val=[$1])
-+- LogicalWatermarkAssigner(rowtime=[proctime], watermark=[$3])
-   +- LogicalProject(id=[$0], val=[$1], name=[$2], proctime=[PROCTIME()])
-      +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rtime=[$1])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
+   +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[proctime, id, name, val])
-+- WatermarkAssigner(rowtime=[proctime], watermark=[proctime])
-   +- Calc(select=[id, val, name, PROCTIME() AS proctime])
-      +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rtime])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+   +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithoutRowtime">
+  <TestCase name="testRowTimeTableSourceGroupWindow">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
+LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+   +- LogicalFilter(condition=[>($2, 100)])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+         +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, EXPR$0, EXPR$1])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
+   +- Exchange(distribution=[hash[name]])
+      +- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+            +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testProjectWithoutProctime">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$3], val=[$2], rtime=[$1], id=[$0])
 +- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
    +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
       +- LogicalTableScan(table=[[default_catalog, default_database, T]])
@@ -75,47 +98,66 @@ LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
+Calc(select=[name, val, rtime, id])
 +- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
    +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
       +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProcTimeTableSourceOverWindow">
+  <TestCase name="testTableSourceWithTimestampRowTimeField">
     <Resource name="planBefore">
       <![CDATA[
-LogicalFilter(condition=[>($2, 100)])
-+- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
-   +- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
+LogicalProject(rowtime=[$1], id=[$0], name=[$3], val=[$2])
++- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
-+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
-   +- Exchange(distribution=[hash[id]])
-      +- Calc(select=[id, val, name, PROCTIME() AS $3])
-         +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
+Calc(select=[rowtime, id, name, val])
++- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+   +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testProjectWithRowtimeProctime">
+  <TestCase name="testProjectWithoutRowtime">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(name=[$3], val=[$2], id=[$0])
-+- LogicalWatermarkAssigner(rowtime=[ptime], watermark=[$4])
+LogicalProject(ptime=[$4], name=[$3], val=[$2], id=[$0])
++- LogicalWatermarkAssigner(rowtime=[rtime], watermark=[$1])
    +- LogicalProject(id=[$0], rtime=[$1], val=[$2], name=[$3], ptime=[PROCTIME()])
       +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[name, val, id])
-+- WatermarkAssigner(rowtime=[ptime], watermark=[ptime])
+Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
++- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
    +- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
       +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRowTimeTableSourceGroupWindowWithNotNullRowTimeType">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$0=[$2], EXPR$1=[$1])
++- LogicalWindowAggregate(group=[{3}], EXPR$1=[AVG($2)], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0])
+   +- LogicalFilter(condition=[>($2, 100)])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 5000:INTERVAL SECOND)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, EXPR$0, EXPR$1])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[EXPR$0], select=[name, AVG(val) AS EXPR$1, end('w) AS EXPR$0])
+   +- Exchange(distribution=[hash[name]])
+      +- Calc(select=[id, rowtime, val, name], where=[>(val, 100)])
+         +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
+            +- TableSourceScan(table=[[default_catalog, default_database, rowTimeT]], fields=[id, rowtime, val, name])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index ccc2b55..80cfff0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -1060,7 +1060,7 @@ class TableEnvironmentTest {
         Row.of("f25", "STRING", Boolean.box(false), null, null, null),
         Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false),
           "PRI(f24, f26)", null, null),
-        Row.of("ts", "TIMESTAMP(3)", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
+        Row.of("ts", "TIMESTAMP(3) *ROWTIME*", Boolean.box(true), null, "TO_TIMESTAMP(`f25`)",
           "`ts` - INTERVAL '1' SECOND")
       ).iterator(),
       tableResult1.collect())
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 95c6f34..6347fd8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -20,13 +20,14 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, TableSchema}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier}
 import org.apache.flink.table.data.{GenericRowData, TimestampData}
+import org.apache.flink.table.delegation.Parser
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
+import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkContext, FlinkPlannerImpl, FlinkTypeFactory, SqlExprToRexConverter, SqlExprToRexConverterFactory}
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
-import org.apache.flink.table.planner.delegation.PlannerContext
+import org.apache.flink.table.planner.delegation.{ParserImpl, PlannerContext}
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5
 import org.apache.flink.table.runtime.generated.WatermarkGenerator
 import org.apache.flink.table.types.logical.{IntType, TimestampType}
@@ -40,6 +41,7 @@ import org.junit.Test
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.util.Collections
+import java.util.function.{Function => JFunction, Supplier => JSupplier}
 
 /**
   * Tests the generated [[WatermarkGenerator]] from [[WatermarkGeneratorCodeGenerator]].
@@ -54,17 +56,35 @@ class WatermarkGeneratorCodeGenTest {
     override def create(tableRowType: RelDataType): SqlExprToRexConverter =
       createSqlExprToRexConverter(tableRowType)
   }
+  private val parser: Parser = new ParserImpl(
+    catalogManager,
+    new JSupplier[FlinkPlannerImpl] {
+      override def get(): FlinkPlannerImpl = getPlanner
+    },
+    // we do not cache the parser in order to use the most up to
+    // date configuration. Users might change parser configuration in TableConfig in between
+    // parsing statements
+    new JSupplier[CalciteParser] {
+      override def get(): CalciteParser = plannerContext.createCalciteParser()
+    },
+    new JFunction[TableSchema, SqlExprToRexConverter] {
+      override def apply(t: TableSchema): SqlExprToRexConverter = {
+        sqlExprToRexConverterFactory.create(plannerContext.getTypeFactory.buildRelNodeRowType(t))
+      }
+    }
+  )
   val plannerContext = new PlannerContext(
     config,
     functionCatalog,
     catalogManager,
-    asRootSchema(new CatalogManagerCalciteSchema(
-      catalogManager, sqlExprToRexConverterFactory, false)),
+    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)),
     Collections.singletonList(ConventionTraitDef.INSTANCE))
   val planner: FlinkPlannerImpl = plannerContext.createFlinkPlanner(
     catalogManager.getCurrentCatalog,
     catalogManager.getCurrentDatabase)
 
+  def getPlanner: FlinkPlannerImpl = planner
+
   val data = List(
     GenericRowData.of(TimestampData.fromEpochMillis(1000L), JInt.valueOf(5)),
     GenericRowData.of(null, JInt.valueOf(4)),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index b5f252b..434adb5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.stream.sql
 
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.planner.utils._
 
 import org.junit.Test
@@ -76,7 +77,9 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProcTimeTableSourceSimple(): Unit = {
+  def testProctimeOnWatermarkSpec(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Watermark can not be defined for a processing time attribute column.")
     val ddl =
       s"""
          |CREATE TABLE procTimeT (
@@ -96,27 +99,6 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProjectWithRowtimeProctime(): Unit = {
-    val ddl =
-      s"""
-         |CREATE TABLE T (
-         |  id int,
-         |  rtime timestamp(3),
-         |  val bigint,
-         |  name varchar(32),
-         |  ptime as PROCTIME(),
-         |  watermark for ptime as ptime
-         |) WITH (
-         |  'connector' = 'values',
-         |  'bounded' = 'false'
-         |)
-       """.stripMargin
-    util.tableEnv.executeSql(ddl)
-
-    util.verifyPlan("SELECT name, val, id FROM T")
-  }
-
-  @Test
   def testProjectWithoutRowtime(): Unit = {
     val ddl =
       s"""
@@ -137,6 +119,7 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan("SELECT ptime, name, val, id FROM T")
   }
 
+  @Test
   def testProjectWithoutProctime(): Unit = {
     val ddl =
       s"""
@@ -157,26 +140,7 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan("select name, val, rtime, id from T")
   }
 
-  def testProjectOnlyProctime(): Unit = {
-    val ddl =
-      s"""
-         |CREATE TABLE T (
-         |  id int,
-         |  rtime timestamp(3),
-         |  val bigint,
-         |  name varchar(32),
-         |  ptime as PROCTIME(),
-         |  watermark for ptime as ptime
-         |) WITH (
-         |  'connector' = 'values',
-         |  'bounded' = 'false'
-         |)
-       """.stripMargin
-    util.tableEnv.executeSql(ddl)
-
-    util.verifyPlan("SELECT ptime FROM T")
-  }
-
+  @Test
   def testProjectOnlyRowtime(): Unit = {
     val ddl =
       s"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
index 62112cf..72d2c11 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala
@@ -179,6 +179,7 @@ class LegacyTableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
+  @Test
   def testProjectWithoutProctime(): Unit = {
     val tableSchema = new TableSchema(
       Array("id", "rtime", "val", "ptime", "name"),
@@ -198,6 +199,7 @@ class LegacyTableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
+  @Test
   def testProjectOnlyProctime(): Unit = {
     val tableSchema = new TableSchema(
       Array("id", "rtime", "val", "ptime", "name"),
@@ -217,6 +219,7 @@ class LegacyTableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
+  @Test
   def testProjectOnlyRowtime(): Unit = {
     val tableSchema = new TableSchema(
       Array("id", "rtime", "val", "ptime", "name"),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
index 36b273d..c97c863 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.stream.table
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.utils.TableTestBase
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 class TableSourceTest extends TableTestBase {
 
@@ -48,7 +48,6 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
-  @Ignore("remove ignore once FLINK-17753 is fixed")
   @Test
   def testRowTimeTableSourceGroupWindow(): Unit = {
     val ddl =
@@ -75,15 +74,15 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProcTimeTableSourceSimple(): Unit = {
+  def testRowTimeTableSourceGroupWindowWithNotNullRowTimeType(): Unit = {
     val ddl =
       s"""
-         |CREATE TABLE procTimeT (
+         |CREATE TABLE rowTimeT (
          |  id int,
+         |  rowtime timestamp(3) not null,
          |  val bigint,
          |  name varchar(32),
-         |  proctime as PROCTIME(),
-         |  watermark for proctime as proctime
+         |  watermark for rowtime as rowtime - INTERVAL '5' SECONDS
          |) WITH (
          |  'connector' = 'values',
          |  'bounded' = 'false'
@@ -91,7 +90,11 @@ class TableSourceTest extends TableTestBase {
        """.stripMargin
     util.tableEnv.executeSql(ddl)
 
-    val t = util.tableEnv.from("procTimeT").select($"proctime", $"id", $"name", $"val")
+    val t = util.tableEnv.from("rowTimeT")
+      .where($"val" > 100)
+      .window(Tumble over 10.minutes on 'rowtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.end, 'val.avg)
     util.verifyPlan(t)
   }
 
@@ -119,28 +122,6 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProjectWithRowtimeProctime(): Unit = {
-    val ddl =
-      s"""
-         |CREATE TABLE T (
-         |  id int,
-         |  rtime timestamp(3),
-         |  val bigint,
-         |  name varchar(32),
-         |  ptime as PROCTIME(),
-         |  watermark for ptime as ptime
-         |) WITH (
-         |  'connector' = 'values',
-         |  'bounded' = 'false'
-         |)
-       """.stripMargin
-    util.tableEnv.executeSql(ddl)
-
-    val t = util.tableEnv.from("T").select('name, 'val, 'id)
-    util.verifyPlan(t)
-  }
-
-  @Test
   def testProjectWithoutRowtime(): Unit = {
     val ddl =
       s"""
@@ -162,6 +143,7 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
+  @Test
   def testProjectWithoutProctime(): Unit = {
     val ddl =
       s"""
@@ -183,7 +165,11 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
-  def testProjectOnlyProctime(): Unit = {
+  @Test
+  def testProctimeOnWatermarkSpec(): Unit = {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Watermark can not be defined for a processing time attribute column.")
+
     val ddl =
       s"""
          |CREATE TABLE T (
@@ -204,6 +190,7 @@ class TableSourceTest extends TableTestBase {
     util.verifyPlan(t)
   }
 
+  @Test
   def testProjectOnlyRowtime(): Unit = {
     val ddl =
       s"""
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
index 402a3f4..17fd3e0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.planner;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.CalciteParser;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.sqlexec.SqlToOperationConverter;
 
@@ -78,4 +80,9 @@ public class ParserImpl implements Parser {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		throw new UnsupportedOperationException();
+	}
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 5d5fc12..905a031 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -127,6 +127,8 @@ abstract class TableEnvImpl(
     }
   )
 
+  catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(parser, false))
+
   def getConfig: TableConfig = config
 
   private val UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index 23bcd94..27f6aeb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.utils.ParserMock;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
@@ -110,6 +112,7 @@ public class CatalogManagerTest extends TestLogger {
 			.builtin(
 				database(BUILTIN_DEFAULT_DATABASE_NAME))
 			.build();
+		manager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
 
 		CatalogTest.TestTable table = new CatalogTest.TestTable();
 		manager.createTemporaryTable(table, tempIdentifier, true);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index a850f48..a7481c7 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -20,11 +20,13 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExternalTableSource;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.utils.ParserMock;
 
 import org.apache.calcite.schema.Table;
 import org.junit.Test;
@@ -53,7 +55,10 @@ public class DatabaseCalciteSchemaTest {
 		CatalogManager catalogManager = CatalogManagerMocks.preparedCatalogManager()
 			.defaultCatalog(catalogName, catalog)
 			.build();
-		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
+		catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
+
+		DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(
+			true,
 			databaseName,
 			catalogName,
 			catalogManager,
@@ -78,6 +83,10 @@ public class DatabaseCalciteSchemaTest {
 			return this;
 		}
 
+		public CatalogTable copy(TableSchema tableSchema) {
+			return this;
+		}
+
 		@Override
 		public Map<String, String> toProperties() {
 			Map<String, String> properties = new HashMap<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index cd981cb..2bd9347 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.calcite.CalciteParser;
 import org.apache.flink.table.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.catalog.Catalog;
@@ -58,6 +59,7 @@ import org.apache.flink.table.planner.PlanningConfigurationBuilder;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.utils.ParserMock;
 
 import org.apache.calcite.sql.SqlNode;
 import org.junit.After;
@@ -103,6 +105,7 @@ public class SqlToOperationConverterTest {
 
 	@Before
 	public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+		catalogManager.setCatalogTableSchemaResolver(new CatalogTableSchemaResolver(new ParserMock(), true));
 		final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
 		final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
 		final TableSchema tableSchema = TableSchema.builder()