You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/09/11 18:35:18 UTC
[flink] branch release-1.9 updated: [FLINK-13653][sql-client]
ResultStore should avoid using RowTypeInfo when creating a result
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new f11cdd3 [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result
f11cdd3 is described below
commit f11cdd3bed438d46583368284afce74bc4ecd703
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Aug 13 20:57:18 2019 +0800
[FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when creating a result
Fix the issue that types with parameters, e.g. decimal, cannot be accessed via SQL client.
This closes #9432.
---
.../gateway/local/CollectBatchTableSink.java | 28 +++++---------
.../gateway/local/CollectStreamTableSink.java | 24 ++++--------
.../table/client/gateway/local/ResultStore.java | 5 ++-
.../local/result/ChangelogCollectStreamResult.java | 5 ++-
.../gateway/local/result/CollectStreamResult.java | 6 +--
.../result/MaterializedCollectBatchResult.java | 6 +--
.../result/MaterializedCollectStreamResult.java | 6 ++-
.../table/client/gateway/local/DependencyTest.java | 13 +++++++
.../client/gateway/local/LocalExecutorITCase.java | 43 ++++++++++++++++++++--
.../MaterializedCollectStreamResultTest.java | 13 +++++++
.../src/test/resources/test-data-1.csv | 18 +++++++++
.../test/resources/test-sql-client-catalogs.yaml | 21 +++++++++++
12 files changed, 139 insertions(+), 49 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
index 9dccfb7..1e3ca21 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -20,12 +20,13 @@ package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
/**
@@ -35,13 +36,12 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements
private final String accumulatorName;
private final TypeSerializer<Row> serializer;
+ private final TableSchema tableSchema;
- private String[] fieldNames;
- private TypeInformation<?>[] fieldTypes;
-
- public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer) {
+ public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer, TableSchema tableSchema) {
this.accumulatorName = accumulatorName;
this.serializer = serializer;
+ this.tableSchema = tableSchema;
}
/**
@@ -52,26 +52,18 @@ public class CollectBatchTableSink extends OutputFormatTableSink<Row> implements
}
@Override
- public TypeInformation<Row> getOutputType() {
- return Types.ROW_NAMED(fieldNames, fieldTypes);
- }
-
- @Override
- public String[] getFieldNames() {
- return fieldNames;
+ public DataType getConsumedDataType() {
+ return getTableSchema().toRowDataType();
}
@Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
+ public TableSchema getTableSchema() {
+ return tableSchema;
}
@Override
public CollectBatchTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- final CollectBatchTableSink copy = new CollectBatchTableSink(accumulatorName, serializer);
- copy.fieldNames = fieldNames;
- copy.fieldTypes = fieldTypes;
- return copy;
+ return new CollectBatchTableSink(accumulatorName, serializer, tableSchema);
}
@Override
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index ce8565a..63adb07 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.types.Row;
@@ -39,37 +40,28 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
private final InetAddress targetAddress;
private final int targetPort;
private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+ private final TableSchema tableSchema;
- private String[] fieldNames;
- private TypeInformation<?>[] fieldTypes;
-
- public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+ public CollectStreamTableSink(InetAddress targetAddress, int targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer, TableSchema tableSchema) {
this.targetAddress = targetAddress;
this.targetPort = targetPort;
this.serializer = serializer;
+ this.tableSchema = tableSchema;
}
@Override
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
+ public TableSchema getTableSchema() {
+ return tableSchema;
}
@Override
public CollectStreamTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- final CollectStreamTableSink copy = new CollectStreamTableSink(targetAddress, targetPort, serializer);
- copy.fieldNames = fieldNames;
- copy.fieldTypes = fieldTypes;
- return copy;
+ return new CollectStreamTableSink(targetAddress, targetPort, serializer, tableSchema);
}
@Override
public TypeInformation<Row> getRecordType() {
- return Types.ROW_NAMED(fieldNames, fieldTypes);
+ return getTableSchema().toRowType();
}
@Override
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index c3cc12b..27a5278 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -67,10 +67,11 @@ public class ResultStore {
final int gatewayPort = getGatewayPort(env.getDeployment());
if (env.getExecution().isChangelogMode()) {
- return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+ return new ChangelogCollectStreamResult<>(outputType, schema, config, gatewayAddress, gatewayPort);
} else {
return new MaterializedCollectStreamResult<>(
outputType,
+ schema,
config,
gatewayAddress,
gatewayPort,
@@ -82,7 +83,7 @@ public class ResultStore {
if (!env.getExecution().isTableMode()) {
throw new SqlExecutionException("Results of batch queries can only be served in table mode.");
}
- return new MaterializedCollectBatchResult<>(outputType, config);
+ return new MaterializedCollectBatchResult<>(schema, outputType, config);
}
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
index 38e9126..43f2de5 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local.result;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
@@ -38,9 +39,9 @@ public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> impl
private List<Tuple2<Boolean, Row>> changeRecordBuffer;
private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
- public ChangelogCollectStreamResult(RowTypeInfo outputType, ExecutionConfig config,
+ public ChangelogCollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config,
InetAddress gatewayAddress, int gatewayPort) {
- super(outputType, config, gatewayAddress, gatewayPort);
+ super(outputType, tableSchema, config, gatewayAddress, gatewayPort);
// prepare for changelog
changeRecordBuffer = new ArrayList<>();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
index 6cdebca..fe74cb9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
@@ -55,7 +56,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D
protected final Object resultLock;
protected SqlExecutionException executionException;
- public CollectStreamResult(RowTypeInfo outputType, ExecutionConfig config,
+ public CollectStreamResult(RowTypeInfo outputType, TableSchema tableSchema, ExecutionConfig config,
InetAddress gatewayAddress, int gatewayPort) {
this.outputType = outputType;
@@ -73,8 +74,7 @@ public abstract class CollectStreamResult<C> extends BasicResult<C> implements D
// create table sink
// pass binding address and port such that sink knows where to send to
- collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer)
- .configure(outputType.getFieldNames(), outputType.getFieldTypes());
+ collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer, tableSchema);
retrievalThread = new ResultRetrievalThread();
monitoringThread = new JobMonitoringThread();
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
index dc482d0..2fe61b8 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.CollectBatchTableSink;
@@ -54,12 +55,11 @@ public class MaterializedCollectBatchResult<C> extends BasicResult<C> implements
private volatile boolean snapshotted = false;
- public MaterializedCollectBatchResult(RowTypeInfo outputType, ExecutionConfig config) {
+ public MaterializedCollectBatchResult(TableSchema tableSchema, RowTypeInfo outputType, ExecutionConfig config) {
this.outputType = outputType;
accumulatorName = new AbstractID().toString();
- tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config))
- .configure(outputType.getFieldNames(), outputType.getFieldTypes());
+ tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config), tableSchema);
resultLock = new Object();
retrievalThread = new ResultRetrievalThread();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 2becfda..7398758 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.types.Row;
@@ -91,12 +92,13 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
@VisibleForTesting
public MaterializedCollectStreamResult(
RowTypeInfo outputType,
+ TableSchema tableSchema,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
int maxRowCount,
int overcommitThreshold) {
- super(outputType, config, gatewayAddress, gatewayPort);
+ super(outputType, tableSchema, config, gatewayAddress, gatewayPort);
if (maxRowCount <= 0) {
this.maxRowCount = Integer.MAX_VALUE;
@@ -118,6 +120,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
public MaterializedCollectStreamResult(
RowTypeInfo outputType,
+ TableSchema tableSchema,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
@@ -125,6 +128,7 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
this(
outputType,
+ tableSchema,
config,
gatewayAddress,
gatewayPort,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 5fabe83..aad2da1 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
@@ -44,6 +45,7 @@ import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.types.DataType;
import org.junit.Test;
@@ -172,6 +174,7 @@ public class DependencyTest {
public static class TestHiveCatalogFactory extends HiveCatalogFactory {
public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database";
public static final String TEST_TABLE = "test_table";
+ static final String TABLE_WITH_PARAMETERIZED_TYPES = "para_types_table";
@Override
public Map<String, String> requiredContext() {
@@ -213,11 +216,21 @@ public class DependencyTest {
),
false
);
+ // create a table to test parameterized types
+ hiveCatalog.createTable(new ObjectPath("default", TABLE_WITH_PARAMETERIZED_TYPES),
+ tableWithParameterizedTypes(),
+ false);
} catch (DatabaseAlreadyExistException | TableAlreadyExistException | DatabaseNotExistException e) {
throw new CatalogException(e);
}
return hiveCatalog;
}
+
+ private CatalogTable tableWithParameterizedTypes() {
+ TableSchema tableSchema = TableSchema.builder().fields(new String[]{"dec", "ch", "vch"},
+ new DataType[]{DataTypes.DECIMAL(10, 10), DataTypes.CHAR(5), DataTypes.VARCHAR(15)}).build();
+ return new CatalogTableImpl(tableSchema, Collections.emptyMap(), "");
+ }
}
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index cbae581..c102acf 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -479,11 +480,14 @@ public class LocalExecutorITCase extends TestLogger {
@Test
public void testUseCatalogAndUseDatabase() throws Exception {
final String csvOutputPath = new File(tempFolder.newFolder().getAbsolutePath(), "test-out.csv").toURI().toString();
- final URL url = getClass().getClassLoader().getResource("test-data.csv");
- Objects.requireNonNull(url);
+ final URL url1 = getClass().getClassLoader().getResource("test-data.csv");
+ final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv");
+ Objects.requireNonNull(url1);
+ Objects.requireNonNull(url2);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", planner);
- replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+ replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+ replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
@@ -501,7 +505,8 @@ public class LocalExecutorITCase extends TestLogger {
Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB),
executor.listDatabases(session));
- assertEquals(Collections.emptyList(), executor.listTables(session));
+ assertEquals(Collections.singletonList(DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES),
+ executor.listTables(session));
executor.useDatabase(session, DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
@@ -529,6 +534,36 @@ public class LocalExecutorITCase extends TestLogger {
executor.useCatalog(session, "nonexistingcatalog");
}
+ @Test
+ public void testParameterizedTypes() throws Exception {
+ // only blink planner supports parameterized types
+ Assume.assumeTrue(planner.equals("blink"));
+ final URL url1 = getClass().getClassLoader().getResource("test-data.csv");
+ final URL url2 = getClass().getClassLoader().getResource("test-data-1.csv");
+ Objects.requireNonNull(url1);
+ Objects.requireNonNull(url2);
+ final Map<String, String> replaceVars = new HashMap<>();
+ replaceVars.put("$VAR_PLANNER", planner);
+ replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+ replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
+ replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+ replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+ replaceVars.put("$VAR_MAX_ROWS", "100");
+ replaceVars.put("$VAR_RESULT_MODE", "table");
+
+ final Executor executor = createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+ final SessionContext session = new SessionContext("test-session", new Environment());
+ executor.useCatalog(session, "hivecatalog");
+ String resultID = executor.executeQuery(session,
+ "select * from " + DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES).getResultId();
+ retrieveTableResult(executor, session, resultID);
+
+ // make sure legacy types still work
+ executor.useCatalog(session, "default_catalog");
+ resultID = executor.executeQuery(session, "select * from TableNumber3").getResultId();
+ retrieveTableResult(executor, session, resultID);
+ }
+
private void executeStreamQueryTable(
Map<String, String> replaceVars,
String query,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7636cd..cf85011 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.junit.Test;
@@ -43,11 +46,14 @@ public class MaterializedCollectStreamResultTest {
@Test
public void testSnapshot() throws UnknownHostException {
final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG);
+ TableSchema tableSchema = TableSchema.builder().fields(
+ new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
TestMaterializedCollectStreamResult<?> result = null;
try {
result = new TestMaterializedCollectStreamResult<>(
type,
+ tableSchema,
new ExecutionConfig(),
InetAddress.getLocalHost(),
0,
@@ -91,11 +97,14 @@ public class MaterializedCollectStreamResultTest {
@Test
public void testLimitedSnapshot() throws UnknownHostException {
final RowTypeInfo type = new RowTypeInfo(Types.STRING, Types.LONG);
+ TableSchema tableSchema = TableSchema.builder().fields(
+ new String[]{"f0", "f1"}, new DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
TestMaterializedCollectStreamResult<?> result = null;
try {
result = new TestMaterializedCollectStreamResult<>(
type,
+ tableSchema,
new ExecutionConfig(),
InetAddress.getLocalHost(),
0,
@@ -146,6 +155,7 @@ public class MaterializedCollectStreamResultTest {
public TestMaterializedCollectStreamResult(
RowTypeInfo outputType,
+ TableSchema tableSchema,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
@@ -154,6 +164,7 @@ public class MaterializedCollectStreamResultTest {
super(
outputType,
+ tableSchema,
config,
gatewayAddress,
gatewayPort,
@@ -163,6 +174,7 @@ public class MaterializedCollectStreamResultTest {
public TestMaterializedCollectStreamResult(
RowTypeInfo outputType,
+ TableSchema tableSchema,
ExecutionConfig config,
InetAddress gatewayAddress,
int gatewayPort,
@@ -170,6 +182,7 @@ public class MaterializedCollectStreamResultTest {
super(
outputType,
+ tableSchema,
config,
gatewayAddress,
gatewayPort,
diff --git a/flink-table/flink-sql-client/src/test/resources/test-data-1.csv b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
new file mode 100644
index 0000000..1794cb7
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
@@ -0,0 +1,18 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+123.123,abcd
\ No newline at end of file
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index d4b8010..ff2ccea 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -71,6 +71,26 @@ tables:
- name: TestView2
type: view
query: SELECT * FROM default_catalog.default_database.TestView1
+ - name: TableNumber3
+ type: source-table
+ $VAR_UPDATE_MODE
+ schema:
+ - name: DecimalField
+ type: DECIMAL
+ - name: StringField
+ type: VARCHAR
+ connector:
+ type: filesystem
+ path: "$VAR_SOURCE_PATH2"
+ format:
+ type: csv
+ fields:
+ - name: DecimalField
+ type: DECIMAL
+ - name: StringField
+ type: VARCHAR
+ line-delimiter: "\n"
+ comment-prefix: "#"
functions:
- name: scalarUDF
@@ -98,6 +118,7 @@ functions:
value: 5
execution:
+ planner: "$VAR_PLANNER"
type: "$VAR_EXECUTION_TYPE"
time-characteristic: event-time
periodic-watermarks-interval: 99