You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 07:06:47 UTC

[flink] branch release-1.11 updated: [FLINK-17113][sql-cli] Use executeSql to execute view statements and fix nullability loss problem

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6979559  [FLINK-17113][sql-cli] Use executeSql to execute view statements and fix nullability loss problem
6979559 is described below

commit 6979559ebd82364fdc86887ccd2d267c9d79740f
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Wed Jun 3 15:07:12 2020 +0800

    [FLINK-17113][sql-cli] Use executeSql to execute view statements and fix nullability loss problem
    
    This closes #12456
---
 .../apache/flink/table/client/cli/CliClient.java   | 45 +---------------------
 .../apache/flink/table/client/cli/CliStrings.java  |  7 ----
 .../flink/table/client/cli/SqlCommandParser.java   |  4 --
 .../flink/table/client/gateway/Executor.java       | 29 --------------
 .../client/gateway/local/ExecutionContext.java     |  6 +--
 .../table/client/gateway/local/LocalExecutor.java  | 43 ---------------------
 .../flink/table/client/cli/CliClientTest.java      | 16 --------
 .../flink/table/client/cli/CliResultViewTest.java  | 16 --------
 .../table/client/cli/SqlCommandParserTest.java     |  7 ++--
 .../flink/table/client/cli/TestingExecutor.java    | 14 -------
 .../client/gateway/local/LocalExecutorITCase.java  | 34 +++++++++++-----
 .../flink/table/calcite/FlinkTypeFactory.scala     | 42 +++++++++++++++++++-
 .../flink/table/catalog/ViewExpansionTest.java     |  4 +-
 13 files changed, 74 insertions(+), 193 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index b93098b..5b0e197 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -322,10 +321,10 @@ public class CliClient {
 				callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_REMOVED);
 				break;
 			case CREATE_VIEW:
-				callCreateView(cmdCall);
+				callDdl(cmdCall.operands[0], CliStrings.MESSAGE_VIEW_CREATED);
 				break;
 			case DROP_VIEW:
-				callDropView(cmdCall);
+				callDdl(cmdCall.operands[0], CliStrings.MESSAGE_VIEW_REMOVED);
 				break;
 			case CREATE_FUNCTION:
 				callDdl(cmdCall.operands[0], CliStrings.MESSAGE_FUNCTION_CREATED);
@@ -605,46 +604,6 @@ public class CliClient {
 		return true;
 	}
 
-	private void callCreateView(SqlCommandCall cmdCall) {
-		final String name = cmdCall.operands[0];
-		final String query = cmdCall.operands[1];
-
-		final ViewEntry previousView = executor.listViews(sessionId).get(name);
-		if (previousView != null) {
-			printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
-			return;
-		}
-
-		try {
-			// perform and validate change
-			executor.addView(sessionId, name, query);
-			printInfo(CliStrings.MESSAGE_VIEW_CREATED);
-		} catch (SqlExecutionException e) {
-			// rollback change
-			executor.removeView(sessionId, name);
-			printExecutionException(e);
-		}
-	}
-
-	private void callDropView(SqlCommandCall cmdCall) {
-		final String name = cmdCall.operands[0];
-		final ViewEntry view = executor.listViews(sessionId).get(name);
-		if (view == null) {
-			printExecutionError(CliStrings.MESSAGE_VIEW_NOT_FOUND);
-			return;
-		}
-
-		try {
-			// perform and validate change
-			executor.removeView(sessionId, name);
-			printInfo(CliStrings.MESSAGE_VIEW_REMOVED);
-		} catch (SqlExecutionException e) {
-			// rollback change
-			executor.addView(sessionId, view.getName(), view.getQuery());
-			printExecutionException(CliStrings.MESSAGE_VIEW_NOT_REMOVED, e);
-		}
-	}
-
 	private void callSource(SqlCommandCall cmdCall) {
 		final String pathString = cmdCall.operands[0];
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index 4d643e2..7fb90e6 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -166,13 +166,6 @@ public final class CliStrings {
 
 	public static final String MESSAGE_CATALOG_REMOVED = "Catalog has been removed.";
 
-	public static final String MESSAGE_VIEW_ALREADY_EXISTS = "A view with this name has already been defined in the current CLI session.";
-
-	public static final String MESSAGE_VIEW_NOT_FOUND = "The given view does not exist in the current CLI session. " +
-		"Only views created with a CREATE VIEW statement can be accessed.";
-
-	public static final String MESSAGE_VIEW_NOT_REMOVED = "The given view cannot be removed without affecting other views.";
-
 	// --------------------------------------------------------------------------------------------
 
 	public static final String RESULT_TITLE = "SQL Query Result";
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index d4c891c..c01d53c 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -116,12 +116,8 @@ public final class SqlCommandParser {
 			cmd = SqlCommand.ALTER_TABLE;
 		} else if (operation instanceof CreateViewOperation) {
 			cmd = SqlCommand.CREATE_VIEW;
-			CreateViewOperation op = (CreateViewOperation) operation;
-			operands = new String[] { op.getViewIdentifier().asSerializableString(),
-					op.getCatalogView().getOriginalQuery() };
 		} else if (operation instanceof DropViewOperation) {
 			cmd = SqlCommand.DROP_VIEW;
-			operands = new String[] { ((DropViewOperation) operation).getViewIdentifier().asSerializableString() };
 		} else if (operation instanceof CreateDatabaseOperation) {
 			cmd = SqlCommand.CREATE_DATABASE;
 		} else if (operation instanceof DropDatabaseOperation) {
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index f35a77a..ed433f5 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.client.gateway;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.types.Row;
 
@@ -78,34 +77,6 @@ public interface Executor {
 	void setSessionProperty(String sessionId, String key, String value) throws SqlExecutionException;
 
 	/**
-	 * Add a new view to the given session.
-	 *
-	 * @param sessionId to identify the session.
-	 * @param name      of the view.
-	 * @param query     to represent the view.
-	 * @throws SqlExecutionException
-	 */
-	void addView(String sessionId, String name, String query) throws SqlExecutionException;
-
-	/**
-	 * Remove the view with given name for the given session.
-	 *
-	 * @param sessionId to identify the session.
-	 * @param name      of the view.
-	 * @throws SqlExecutionException
-	 */
-	void removeView(String sessionId, String name) throws SqlExecutionException;
-
-	/**
-	 * Lists all registered views for the given session.
-	 *
-	 * @param sessionId to identify the session.
-	 * @return list of view in the given session.
-	 * @throws SqlExecutionException
-	 */
-	Map<String, ViewEntry> listViews(String sessionId) throws SqlExecutionException;
-
-	/**
 	 * Lists all registered catalogs.
 	 */
 	List<String> listCatalogs(String sessionid) throws SqlExecutionException;
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index a13eab7..afe9a06 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -603,7 +603,7 @@ public class ExecutionContext<ClusterID> {
 			// it means that it accesses tables that are not available anymore
 			if (entry instanceof ViewEntry) {
 				final ViewEntry viewEntry = (ViewEntry) entry;
-				registerView(viewEntry);
+				registerTemporaryView(viewEntry);
 			}
 		});
 
@@ -695,9 +695,9 @@ public class ExecutionContext<ClusterID> {
 		}
 	}
 
-	private void registerView(ViewEntry viewEntry) {
+	private void registerTemporaryView(ViewEntry viewEntry) {
 		try {
-			tableEnv.registerTable(viewEntry.getName(), tableEnv.sqlQuery(viewEntry.getQuery()));
+			tableEnv.createTemporaryView(viewEntry.getName(), tableEnv.sqlQuery(viewEntry.getQuery()));
 		} catch (Exception e) {
 			throw new SqlExecutionException(
 				"Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery()
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 665e85d..f778522 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -45,8 +45,6 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
-import org.apache.flink.table.client.config.entries.TableEntry;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -305,47 +303,6 @@ public class LocalExecutor implements Executor {
 	}
 
 	@Override
-	public void addView(String sessionId, String name, String query) throws SqlExecutionException {
-		ExecutionContext<?> context = getExecutionContext(sessionId);
-		TableEnvironment tableEnv = context.getTableEnvironment();
-		tableEnv.createTemporaryView(name, tableEnv.sqlQuery(query));
-		// Also attach the view to ExecutionContext#environment.
-		context.getEnvironment().getTables().put(name, ViewEntry.create(name, query));
-	}
-
-	@Override
-	public void removeView(String sessionId, String name) throws SqlExecutionException {
-		// Here we rebuild the ExecutionContext because we want to ensure that all the remaining views can work fine.
-		// Assume the case:
-		//   view1=select 1;
-		//   view2=select * from view1;
-		// If we delete view1 successfully, then query view2 will throw exception because view1 does not exist. we want
-		// all the remaining views are OK, so do the ExecutionContext rebuilding to avoid breaking the view dependency.
-		ExecutionContext<?> context = getExecutionContext(sessionId);
-		Environment env = context.getEnvironment();
-		Environment newEnv = env.clone();
-		if (newEnv.getTables().remove(name) != null) {
-			// Renew the ExecutionContext.
-			this.contextMap.put(
-					sessionId,
-					createExecutionContextBuilder(context.getOriginalSessionContext())
-							.env(newEnv).build());
-		}
-	}
-
-	@Override
-	public Map<String, ViewEntry> listViews(String sessionId) throws SqlExecutionException {
-		Map<String, ViewEntry> views = new HashMap<>();
-		Map<String, TableEntry> tables = getExecutionContext(sessionId).getEnvironment().getTables();
-		for (Map.Entry<String, TableEntry> entry : tables.entrySet()) {
-			if (entry.getValue() instanceof ViewEntry) {
-				views.put(entry.getKey(), (ViewEntry) entry.getValue());
-			}
-		}
-		return views;
-	}
-
-	@Override
 	public List<String> listCatalogs(String sessionId) throws SqlExecutionException {
 		final ExecutionContext<?> context = getExecutionContext(sessionId);
 		final TableEnvironment tableEnv = context.getTableEnvironment();
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index 38f554f..28c5c8d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.client.cli.utils.SqlParserHelper;
 import org.apache.flink.table.client.cli.utils.TerminalUtils;
 import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream;
 import org.apache.flink.table.client.config.Environment;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -394,21 +393,6 @@ public class CliClientTest extends TestLogger {
 		}
 
 		@Override
-		public void addView(String sessionId, String name, String query) throws SqlExecutionException {
-
-		}
-
-		@Override
-		public void removeView(String sessionId, String name) throws SqlExecutionException {
-
-		}
-
-		@Override
-		public Map<String, ViewEntry> listViews(String sessionId) throws SqlExecutionException {
-			return null;
-		}
-
-		@Override
 		public List<String> listCatalogs(String sessionId) throws SqlExecutionException {
 			return null;
 		}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index b2e4627..e78928c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.client.cli.utils.TerminalUtils;
 import org.apache.flink.table.client.config.Environment;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -157,21 +156,6 @@ public class CliResultViewTest {
 		}
 
 		@Override
-		public void addView(String sessionId, String name, String query) throws SqlExecutionException {
-
-		}
-
-		@Override
-		public void removeView(String sessionId, String name) throws SqlExecutionException {
-
-		}
-
-		@Override
-		public Map<String, ViewEntry> listViews(String sessionId) throws SqlExecutionException {
-			return null;
-		}
-
-		@Override
 		public List<String> listCatalogs(String sessionId) throws SqlExecutionException {
 			return null;
 		}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index f5ca991..a8d2a8e 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -112,18 +112,17 @@ public class SqlCommandParserTest {
 				// create view xx
 				TestItem.validSql("CREATE VIEW x AS SELECT 1+1",
 						SqlCommand.CREATE_VIEW,
-						"`default_catalog`.`default_database`.`x`", "SELECT 1 + 1"),
+						"CREATE VIEW x AS SELECT 1+1"),
 				TestItem.validSql("CREATE   VIEW    x   AS     SELECT 1+1 FROM MyTable",
 						SqlCommand.CREATE_VIEW,
-						"`default_catalog`.`default_database`.`x`",
-						"SELECT 1 + 1\nFROM `default_catalog`.`default_database`.`MyTable` AS `MyTable`"),
+						"CREATE   VIEW    x   AS     SELECT 1+1 FROM MyTable"),
 				TestItem.invalidSql("CREATE VIEW x SELECT 1+1 ", // missing AS
 						SqlExecutionException.class,
 						"Encountered \"SELECT\""),
 				// drop view xx
 				TestItem.validSql("DROP VIEW TestView1",
 						SqlCommand.DROP_VIEW,
-						"`default_catalog`.`default_database`.`TestView1`"),
+						"DROP VIEW TestView1"),
 				TestItem.invalidSql("DROP VIEW ", // missing name
 						SqlExecutionException.class,
 						"Encountered \"<EOF>\""),
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index 2635d4f..a26e616 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.cli.utils.SqlParserHelper;
-import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -156,19 +155,6 @@ class TestingExecutor implements Executor {
 	}
 
 	@Override
-	public void addView(String sessionId, String name, String query) throws SqlExecutionException {
-	}
-
-	@Override
-	public void removeView(String sessionId, String name) throws SqlExecutionException {
-	}
-
-	@Override
-	public Map<String, ViewEntry> listViews(String sessionId) throws SqlExecutionException {
-		throw new UnsupportedOperationException("Not implemented.");
-	}
-
-	@Override
 	public List<String> listCatalogs(String sessionid) throws SqlExecutionException {
 		throw new UnsupportedOperationException("Not implemented.");
 	}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index f7b56d4..d4c7fac 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -154,8 +154,19 @@ public class LocalExecutorITCase extends TestLogger {
 		String sessionId = executor.openSession(session);
 		assertEquals("test-session", sessionId);
 
-		executor.addView(sessionId, "AdditionalView1", "SELECT 1");
-		executor.addView(sessionId, "AdditionalView2", "SELECT * FROM AdditionalView1");
+		executor.executeSql(sessionId,
+				"CREATE TEMPORARY VIEW IF NOT EXISTS AdditionalView1 AS SELECT 1");
+		try {
+			executor.executeSql(sessionId,
+					"CREATE TEMPORARY VIEW AdditionalView1 AS SELECT 2");
+			fail("unexpected exception");
+		} catch (Exception var1) {
+			assertThat(var1.getCause().getMessage(),
+					is("Temporary table '`default_catalog`.`default_database`.`AdditionalView1`' already exists"));
+		}
+		executor.executeSql(sessionId, "CREATE VIEW AdditionalView1 AS SELECT 2");
+		executor.executeSql(sessionId,
+				"CREATE TEMPORARY VIEW IF NOT EXISTS AdditionalView2 AS SELECT * FROM AdditionalView1");
 
 		List<String> actualTables = executor.listTables(sessionId);
 		List<String> expectedTables = Arrays.asList(
@@ -168,16 +179,19 @@ public class LocalExecutorITCase extends TestLogger {
 				"TestView2");
 		assertEquals(expectedTables, actualTables);
 
+		// Although AdditionalView2 needs AdditionalView1, dropping AdditionalView1 first does not
+		// throw.
 		try {
-			executor.removeView(sessionId, "AdditionalView1");
-			fail();
-		} catch (SqlExecutionException e) {
-			// AdditionalView2 needs AdditionalView1
+			executor.executeSql(sessionId, "DROP VIEW AdditionalView1");
+			fail("unexpected exception");
+		} catch (Exception var1) {
+			assertThat(var1.getCause().getMessage(),
+					is("Temporary view with identifier '`default_catalog`.`default_database`.`AdditionalView1`' exists. "
+							+ "Drop it first before removing the permanent view."));
 		}
-
-		executor.removeView(sessionId, "AdditionalView2");
-
-		executor.removeView(sessionId, "AdditionalView1");
+		executor.executeSql(sessionId, "DROP TEMPORARY VIEW AdditionalView1");
+		executor.executeSql(sessionId, "DROP VIEW AdditionalView1");
+		executor.executeSql(sessionId, "DROP TEMPORARY VIEW AdditionalView2");
 
 		actualTables = executor.listTables(sessionId);
 		expectedTables = Arrays.asList(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index f50938c..763c83d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.calcite
 
 import java.util
 import java.nio.charset.Charset
-
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
@@ -37,6 +36,9 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, Objec
 import org.apache.flink.table.api.{TableException, TableSchema}
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
 import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, TimestampType}
+import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo}
 import org.apache.flink.types.Row
@@ -180,7 +182,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
     * @return a struct type with the input fieldNames, input fieldTypes, and system fields
     */
   def buildLogicalRowType(tableSchema: TableSchema): RelDataType = {
-    buildLogicalRowType(tableSchema.getFieldNames, tableSchema.getFieldTypes)
+    buildLogicalRowType(tableSchema.getFieldNames, tableSchema.getFieldDataTypes)
+  }
+
+  /**
+   * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
+   *
+   * @param fieldNames field names
+   * @param fieldTypes field types, every element is Flink's [[DataType]]
+   * @return a struct type with the input fieldNames, input fieldTypes, and system fields
+   */
+  def buildLogicalRowType(
+    fieldNames: Array[String],
+    fieldTypes: Array[DataType])
+  : RelDataType = {
+    val logicalRowTypeBuilder = builder
+
+    val fields = fieldNames.zip(fieldTypes)
+    fields.foreach(f => {
+      // time indicators are not nullable
+      val logicalType = f._2.getLogicalType
+      val nullable  = if (FlinkTypeFactory.isTimeIndicatorType(logicalType)) {
+        false
+      } else {
+        logicalType.isNullable
+      }
+
+      logicalRowTypeBuilder.add(f._1,
+        createTypeFromTypeInfo(TypeConversions.fromDataTypeToLegacyInfo(f._2), nullable))
+    })
+
+    logicalRowTypeBuilder.build
   }
 
   /**
@@ -390,6 +422,12 @@ object FlinkTypeFactory {
     case _ => false
   }
 
+  def isTimeIndicatorType(t: LogicalType): Boolean = t match {
+    case t: TimestampType
+      if t.getKind == TimestampKind.ROWTIME || t.getKind == TimestampKind.PROCTIME => true
+    case _ => false
+  }
+
   def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
     case ti: TimeIndicatorRelDataType => true
     case _ => false
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/ViewExpansionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/ViewExpansionTest.java
index a036495..d6c26ff 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/ViewExpansionTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/ViewExpansionTest.java
@@ -87,7 +87,7 @@ public class ViewExpansionTest {
 			).build();
 
 		StreamTableTestUtil util = new StreamTableTestUtil(new Some<>(catalogManager));
-		final String expected = "DataStreamCalc(select=[a, b, CAST(EXPR$2) AS c])\n"
+		final String expected = "DataStreamCalc(select=[CAST(a) AS a, b, CAST(EXPR$2) AS c])\n"
 			+ "DataStreamGroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2])\n"
 			+ "StreamTableSourceScan(table=[[builtin, default, tab1]], fields=[a, b, c], source=[isTemporary=[false]])";
 		util.verifyJavaSql(
@@ -140,7 +140,7 @@ public class ViewExpansionTest {
 
 		StreamTableTestUtil util = new StreamTableTestUtil(new Some<>(catalogManager));
 		Table tab = util.javaTableEnv().scan("builtin", "default", "view").select($("*"));
-		final String expected = "DataStreamCalc(select=[a, b, CAST(EXPR$2) AS c])\n"
+		final String expected = "DataStreamCalc(select=[CAST(a) AS a, b, CAST(EXPR$2) AS c])\n"
 			+ "DataStreamGroupAggregate(groupBy=[a, b], select=[a, b, COUNT(c) AS EXPR$2])\n"
 			+ "StreamTableSourceScan(table=[[builtin, default, tab1]], fields=[a, b, c], source=[isTemporary=[false]])";
 		util.verifyJavaTable(