You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dezhi Cai (Jira)" <ji...@apache.org> on 2019/11/29 15:04:00 UTC
[jira] [Created] (FLINK-14994) StreamTableEnvironment.connect throw
exception when using "FileSystem" connector and "CSV" format
Dezhi Cai created FLINK-14994:
---------------------------------
Summary: StreamTableEnvironment.connect throw exception when using "FileSystem" connector and "CSV" format
Key: FLINK-14994
URL: https://issues.apache.org/jira/browse/FLINK-14994
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem, Table SQL / API
Affects Versions: 1.9.1, 1.9.0
Reporter: Dezhi Cai
Attachments: image-2019-11-29-22-50-50-367.png
I use two approaches to register table in StreamTableEnvironment. The DDL approach run fine and the "StreamTableEnvironment.connect" one throw exception.
{color:#FF0000}the root cause :{color}
"CsvTableSourceFactoryBase.supportedProperties" does't include "format.schema", it cause that "org.apache.flink.table.factories.TableFactoryService#filterBySupportedProperties" return no "TableSourceFactory"
this approach run successfully (using DDL)
{code:java}
public static void main1(String[] args) throws Exception{
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
String sql = "create table test(last_update_dt TIMESTAMP) " +
"with (" +
"'connector.type' = 'filesystem'," +
"'connector.path' = 'C:/work/1.csv'," +
"'format.type' = 'csv'," +
"'format.fields.0.name' = 'last_update_dt'," +
"'format.fields.0.type' = 'TIMESTAMP'" +
")";
streamTableEnvironment.sqlUpdate(sql);
Table data = streamTableEnvironment.sqlQuery("select * from test");
streamTableEnvironment.toAppendStream(data, Row.class).print();
environment.execute();
}{code}
this approach throw Exception
{code:java}
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(environment, settings);
streamTableEnvironment.connect(
new FileSystem()
.path("c:/work/1.csv")
).withFormat(
new Csv()
.schema(Types.ROW(Types.SQL_TIMESTAMP))
).withSchema(
new Schema()
.field("last_update_dt", Types.SQL_TIMESTAMP)
).inAppendMode().registerTableSource("test");
Table data = streamTableEnvironment.sqlQuery("select * from test");
streamTableEnvironment.toAppendStream(data, Row.class).print();
environment.execute();
}
{code}
!image-2019-11-29-22-50-50-367.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)