You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/09/11 18:35:18 UTC

[flink] branch release-1.9 updated: [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f11cdd3  [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result
f11cdd3 is described below

commit f11cdd3bed438d46583368284afce74bc4ecd703
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Aug 13 20:57:18 2019 +0800

    [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result
    
    Fix the issue that types with parameters, e.g. decimal, cannot be accessed via SQL client.
    
    This closes #9432.
---
 .../gateway/local/CollectBatchTableSink.java       | 28 +++++---------
 .../gateway/local/CollectStreamTableSink.java      | 24 ++++--------
 .../table/client/gateway/local/ResultStore.java    |  5 ++-
 .../local/result/ChangelogCollectStreamResult.java |  5 ++-
 .../gateway/local/result/CollectStreamResult.java  |  6 +--
 .../result/MaterializedCollectBatchResult.java     |  6 +--
 .../result/MaterializedCollectStreamResult.java    |  6 ++-
 .../table/client/gateway/local/DependencyTest.java | 13 +++++++
 .../client/gateway/local/LocalExecutorITCase.java  | 43 ++++++++++++++++++++--
 .../MaterializedCollectStreamResultTest.java       | 13 +++++++
 .../src/test/resources/test-data-1.csv             | 18 +++++++++
 .../test/resources/test-sql-client-catalogs.yaml   | 21 +++++++++++
 12 files changed, 139 insertions(+), 49 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
index 9dccfb7..1e3ca21 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -20,12 +20,13 @@ package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.BatchTableSink;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 /**
@@ -35,13 +36,12 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements
 
 	private final String accumulatorName;
 	private final TypeSerializer<Row> serializer;
+	private final TableSchema tableSchema;
 
-	private String[] fieldNames;
-	private TypeInformation<?>[] fieldTypes;
-
-	public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer) {
+	public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer, TableSchema tableSchema) {
 		this.accumulatorName = accumulatorName;
 		this.serializer = serializer;
+		this.tableSchema = tableSchema;
 	}
 
 	/**
@@ -52,26 +52,18 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements
 	}
 
 	@Override
-	public TypeInformation<Row> getOutputType() {
-		return Types.ROW_NAMED(fieldNames, fieldTypes);
-	}
-
-	@Override
-	public String[] getFieldNames() {
-		return fieldNames;
+	public DataType getConsumedDataType() {
+		return getTableSchema().toRowDataType();
 	}
 
 	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
+	public TableSchema getTableSchema() {
+		return tableSchema;
 	}
 
 	@Override
 	public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-		final CollectBatchTableSink copy = new CollectBatchTableSink(accumulatorName, serializer);
-		copy.fieldNames = fieldNames;
-		copy.fieldTypes = fieldTypes;
-		return copy;
+		return new CollectBatchTableSink(accumulatorName, serializer, tableSchema);
 	}
 
 	@Override
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index ce8565a..63adb07 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
 
@@ -39,37 +40,28 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
 	private final InetAddress targetAddress;
 	private final int targetPort;
 	private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+	private final TableSchema tableSchema;
 
-	private String[] fieldNames;
-	private TypeInformation<?>[] fieldTypes;
-
-	public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+	public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer, TableSchema tableSchema) {
 		this.targetAddress = targetAddress;
 		this.targetPort = targetPort;
 		this.serializer = serializer;
+		this.tableSchema = tableSchema;
 	}
 
 	@Override
-	public String[] getFieldNames() {
-		return fieldNames;
-	}
-
-	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
+	public TableSchema getTableSchema() {
+		return tableSchema;
 	}
 
 	@Override
 	public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-		final CollectStreamTableSink copy = new CollectStreamTableSink(targetAddress, targetPort, serializer);
-		copy.fieldNames = fieldNames;
-		copy.fieldTypes = fieldTypes;
-		return copy;
+		return new CollectStreamTableSink(targetAddress, targetPort, serializer, tableSchema);
 	}
 
 	@Override
 	public TypeInformation<Row> getRecordType() {
-		return Types.ROW_NAMED(fieldNames, fieldTypes);
+		return getTableSchema().toRowType();
 	}
 
 	@Override
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index c3cc12b..27a5278 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -67,10 +67,11 @@ public class ResultStore {
 			final int gatewayPort = getGatewayPort(env.getDeployment());
 
 			if (env.getExecution().isChangelogMode()) {
-				return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+				return new ChangelogCollectStreamResult<>(outputType, schema, config, gatewayAddress, gatewayPort);
 			} else {
 				return new MaterializedCollectStreamResult<>(
 					outputType,
+					schema,
 					config,
 					gatewayAddress,
 					gatewayPort,
@@ -82,7 +83,7 @@ public class ResultStore {
 			if (!env.getExecution().isTableMode()) {
 				throw new SqlExecutionException("Results of batch queries can only be served in table mode.");
 			}
-			return new MaterializedCollectBatchResult<>(outputType, config);
+			return new MaterializedCollectBatchResult<>(schema, outputType, config);
 		}
 	}
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
index 38e9126..43f2de5 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local.result;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
 
@@ -38,9 +39,9 @@ public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> impl
 	private List<Tuple2<Boolean, Row>> changeRecordBuffer;
 	private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
 
-	public ChangelogCollectStreamResult(RowTypeInfo outputType, ExecutionConfig config,
+	public ChangelogCollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config,
 			InetAddress gatewayAddress, int gatewayPort) {
-		super(outputType, config, gatewayAddress, gatewayPort);
+		super(outputType, tableSchema, config, gatewayAddress, gatewayPort);
 
 		// prepare for changelog
 		changeRecordBuffer = new ArrayList<>();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
index 6cdebca..fe74cb9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
@@ -55,7 +56,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D
 	protected final Object resultLock;
 	protected SqlExecutionException executionException;
 
-	public CollectStreamResult(RowTypeInfo outputType, ExecutionConfig config,
+	public CollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config,
 			InetAddress gatewayAddress, int gatewayPort) {
 		this.outputType = outputType;
 
@@ -73,8 +74,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D
 
 		// create table sink
 		// pass binding address and port such that sink knows where to send to
-		collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer)
-			.configure(outputType.getFieldNames(), outputType.getFieldTypes());
+		collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer, tableSchema);
 		retrievalThread = new ResultRetrievalThread();
 		monitoringThread = new JobMonitoringThread();
 	}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
index dc482d0..2fe61b8 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.local.CollectBatchTableSink;
@@ -54,12 +55,11 @@ public class MaterializedCollectBatchResult<C> extends BasicResult<C> implements
 
 	private volatile boolean snapshotted = false;
 
-	public MaterializedCollectBatchResult(RowTypeInfo outputType, ExecutionConfig config) {
+	public MaterializedCollectBatchResult(TableSchema tableSchema, RowTypeInfo outputType, ExecutionConfig config) {
 		this.outputType = outputType;
 
 		accumulatorName = new AbstractID().toString();
-		tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config))
-			.configure(outputType.getFieldNames(), outputType.getFieldTypes());
+		tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config), tableSchema);
 		resultLock = new Object();
 		retrievalThread = new ResultRetrievalThread();
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 2becfda..7398758 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
@@ -91,12 +92,13 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 	@VisibleForTesting
 	public MaterializedCollectStreamResult(
 			RowTypeInfo outputType,
+			TableSchema tableSchema,
 			ExecutionConfig config,
 			InetAddress gatewayAddress,
 			int gatewayPort,
 			int maxRowCount,
 			int overcommitThreshold) {
-		super(outputType, config, gatewayAddress, gatewayPort);
+		super(outputType, tableSchema, config, gatewayAddress, gatewayPort);
 
 		if (maxRowCount <= 0) {
 			this.maxRowCount = Integer.MAX_VALUE;
@@ -118,6 +120,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 
 	public MaterializedCollectStreamResult(
 			RowTypeInfo outputType,
+			TableSchema tableSchema,
 			ExecutionConfig config,
 			InetAddress gatewayAddress,
 			int gatewayPort,
@@ -125,6 +128,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 
 		this(
 			outputType,
+			tableSchema,
 			config,
 			gatewayAddress,
 			gatewayPort,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 5fabe83..aad2da1 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -44,6 +45,7 @@ import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.types.DataType;
 
 import org.junit.Test;
 
@@ -172,6 +174,7 @@ public class DependencyTest {
 	public static class TestHiveCatalogFactory extends HiveCatalogFactory {
 		public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database";
 		public static final String TEST_TABLE = "test_table";
+		static final String TABLE_WITH_PARAMETERIZED_TYPES = "para_types_table";
 
 		@Override
 		public Map<String, String> requiredContext() {
@@ -213,11 +216,21 @@ public class DependencyTest {
 					),
 					false
 				);
+				// create a table to test parameterized types
+				hiveCatalog.createTable(new ObjectPath("default", TABLE_WITH_PARAMETERIZED_TYPES),
+						tableWithParameterizedTypes(),
+						false);
 			} catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) {
 				throw new CatalogException(e);
 			}
 
 			return hiveCatalog;
 		}
+
+		private CatalogTable tableWithParameterizedTypes() {
+			TableSchema tableSchema = TableSchema.builder().fields(new String[]{"dec", "ch", "vch"},
+					new DataType[]{DataTypes.DECIMAL(10, 10), DataTypes.CHAR(5), DataTypes.VARCHAR(15)}).build();
+			return new CatalogTableImpl(tableSchema, Collections.emptyMap(), "");
+		}
 	}
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index cbae581..c102acf 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -479,11 +480,14 @@ public class LocalExecutorITCase extends TestLogger {
 	@Test
 	public void testUseCatalogAndUseDatabase() throws Exception {
 		final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
-		final URL url = getClass().getClassLoader().getResource("test-data.csv");
-		Objects.requireNonNull(url);
+		final URL url1 = getClass().getClassLoader().getResource("test-data.csv");
+		final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv");
+		Objects.requireNonNull(url1);
+		Objects.requireNonNull(url2);
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_PLANNER", planner);
-		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+		replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+		replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
 		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
 		replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
 		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
@@ -501,7 +505,8 @@ public class LocalExecutorITCase extends TestLogger {
 				Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB),
 				executor.listDatabases(session));
 
-			assertEquals(Collections.emptyList(), executor.listTables(session));
+			assertEquals(Collections.singletonList(DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES),
+					executor.listTables(session));
 
 			executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
 
@@ -529,6 +534,36 @@ public class LocalExecutorITCase extends TestLogger {
 		executor.useCatalog(session, "nonexistingcatalog");
 	}
 
+	@Test
+	public void testParameterizedTypes() throws Exception {
+		// only blink planner supports parameterized types
+		Assume.assumeTrue(planner.equals("blink"));
+		final URL url1 = getClass().getClassLoader().getResource("test-data.csv");
+		final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv");
+		Objects.requireNonNull(url1);
+		Objects.requireNonNull(url2);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
+		replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+		replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
+		replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
+		replaceVars.put("$VAR_RESULT_MODE", "table");
+
+		final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		executor.useCatalog(session, "hivecatalog");
+		String resultID = executor.executeQuery(session,
+				"select * from " + DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES).getResultId();
+		retrieveTableResult(executor, session, resultID);
+
+		// make sure legacy types still work
+		executor.useCatalog(session, "default_catalog");
+		resultID = executor.executeQuery(session, "select * from TableNumber3").getResultId();
+		retrieveTableResult(executor, session, resultID);
+	}
+
 	private void executeStreamQueryTable(
 			Map<String, String> replaceVars,
 			String query,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7636cd..cf85011 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -43,11 +46,14 @@ public class MaterializedCollectStreamResultTest {
 	@Test
 	public void testSnapshot() throws UnknownHostException {
 		final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG);
+		TableSchema tableSchema = TableSchema.builder().fields(
+				new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
 
 		TestMaterializedCollectStreamResult<?> result = null;
 		try {
 			result = new TestMaterializedCollectStreamResult<>(
 				type,
+				tableSchema,
 				new ExecutionConfig(),
 				InetAddress.getLocalHost(),
 				0,
@@ -91,11 +97,14 @@ public class MaterializedCollectStreamResultTest {
 	@Test
 	public void testLimitedSnapshot() throws UnknownHostException {
 		final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG);
+		TableSchema tableSchema = TableSchema.builder().fields(
+				new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
 
 		TestMaterializedCollectStreamResult<?> result = null;
 		try {
 			result = new TestMaterializedCollectStreamResult<>(
 				type,
+				tableSchema,
 				new ExecutionConfig(),
 				InetAddress.getLocalHost(),
 				0,
@@ -146,6 +155,7 @@ public class MaterializedCollectStreamResultTest {
 
 		public TestMaterializedCollectStreamResult(
 				RowTypeInfo outputType,
+				TableSchema tableSchema,
 				ExecutionConfig config,
 				InetAddress gatewayAddress,
 				int gatewayPort,
@@ -154,6 +164,7 @@ public class MaterializedCollectStreamResultTest {
 
 			super(
 				outputType,
+				tableSchema,
 				config,
 				gatewayAddress,
 				gatewayPort,
@@ -163,6 +174,7 @@ public class MaterializedCollectStreamResultTest {
 
 		public TestMaterializedCollectStreamResult(
 				RowTypeInfo outputType,
+				TableSchema tableSchema,
 				ExecutionConfig config,
 				InetAddress gatewayAddress,
 				int gatewayPort,
@@ -170,6 +182,7 @@ public class MaterializedCollectStreamResultTest {
 
 			super(
 				outputType,
+				tableSchema,
 				config,
 				gatewayAddress,
 				gatewayPort,
diff --git a/flink-table/flink-sql-client/src/test/resources/test-data-1.csv b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
new file mode 100644
index 0000000..1794cb7
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
@@ -0,0 +1,18 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+123.123,abcd
\ No newline at end of file
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index d4b8010..ff2ccea 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -71,6 +71,26 @@ tables:
   - name: TestView2
     type: view
     query: SELECT * FROM default_catalog.default_database.TestView1
+  - name: TableNumber3
+    type: source-table
+    $VAR_UPDATE_MODE
+    schema:
+      - name: DecimalField
+        type: DECIMAL
+      - name: StringField
+        type: VARCHAR
+    connector:
+      type: filesystem
+      path: "$VAR_SOURCE_PATH2"
+    format:
+      type: csv
+      fields:
+        - name: DecimalField
+          type: DECIMAL
+        - name: StringField
+          type: VARCHAR
+      line-delimiter: "\n"
+      comment-prefix: "#"
 
 functions:
   - name: scalarUDF
@@ -98,6 +118,7 @@ functions:
         value: 5
 
 execution:
+  planner: "$VAR_PLANNER"
   type: "$VAR_EXECUTION_TYPE"
   time-characteristic: event-time
   periodic-watermarks-interval: 99