You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赵峰 <zh...@163.com> on 2020/03/24 13:20:13 UTC

Flink JDBC Driver可以创建kafka表吗?

hi
   <b>Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:</b>
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

<b>报错:</b>
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰

回复: Flink JDBC Driver是否支持创建流数据表

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
    + "    order_no VARCHAR,\n"
    + "    status INT\n"
    + ") WITH (\n"
    + "    'connector.type' = 'kafka',\n"
    + "    'connector.version' = 'universal',\n"
    + "    'connector.topic' = 'wanglei_test',\n"
    + "    'connector.startup-mode' = 'latest-offset',\n"
    + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
    + "    'connector.properties.0.value' = 'xxx:2181',\n"
    + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
    + "    'connector.properties.1.value' = 'xxx:9092',\n"
    + "    'update-mode' = 'append',\n"
    + "    'format.type' = 'json',\n"
    + "    'format.derive-schema' = 'true'\n"
    + ")");

王磊


wanglei2@geekplus.com.cn
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰

Flink JDBC Driver是否支持创建流数据表

Posted by 赵峰 <zh...@163.com>.
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
} 

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰