You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by RS <ti...@163.com> on 2020/07/27 09:31:30 UTC
kafka-connect json格式适配问题?
hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储
Flink定义输出表结构:
CREATE TABLE print_table \
(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
输出的数据格式示例:
{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
但是kafka-connect-jdbc的json格式需要schema和payload,示例:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "admin"
}
}
请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
当前Flink处理sql:
INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)
Re:Re: kafka-connect json格式适配问题?
Posted by RS <ti...@163.com>.
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来?
Thanks
在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>update_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>> "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>> {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>> },
>>
>> {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>> }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>> },
>>
>> "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>> }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>
Re: kafka-connect json格式适配问题?
Posted by Leonard Xu <xb...@gmail.com>.
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的
祝好
Leonard
> 在 2020年7月27日,20:05,RS <ti...@163.com> 写道:
>
> Hi,
> 改了下sql,遇到一个新的问题:
> Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'.
>
>
> SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload
>
>
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
>
>
> Thanks
> 在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>> Hi,
>>
>> 你需要在 DDL 和 query 上都补上 schema 和 payload:
>>
>> CREATE TABLE print_table \
>> (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>> update_time TIMESTAMP(6)>) \
>> WITH (\
>> 'connector' = 'kafka', \
>> 'topic' = 'test_out', \
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>> 'sink.partitioner' = 'round-robin', \
>> 'format' = 'json')
>>
>> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>> INSERT INTO output
>> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>> update_time) as payload
>> FROM ...
>>
>>
>> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>> mysql 不是很方便么?
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>>
>>> hi,
>>> kafka->Flink->kafka->mysql
>>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>>> 使用kafka-connect是方便数据同时导出到其他存储
>>>
>>>
>>>
>>> Flink定义输出表结构:
>>>
>>> CREATE TABLE print_table \
>>>
>>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>>
>>> WITH (\
>>>
>>> 'connector' = 'kafka', \
>>>
>>> 'topic' = 'test_out', \
>>>
>>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>>
>>> 'sink.partitioner' = 'round-robin', \
>>>
>>> 'format' = 'json')
>>>
>>>
>>>
>>>
>>> 输出的数据格式示例:
>>>
>>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>>
>>>
>>>
>>>
>>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>>
>>> {
>>>
>>> "schema": {
>>>
>>> "type": "struct",
>>>
>>> "fields": [
>>>
>>> {
>>>
>>> "type": "int64",
>>>
>>> "optional": false,
>>>
>>> "field": "id"
>>>
>>> },
>>>
>>> {
>>>
>>> "type": "string",
>>>
>>> "optional": true,
>>>
>>> "field": "name"
>>>
>>> }
>>>
>>> ],
>>>
>>> "optional": true,
>>>
>>> "name": "user"
>>>
>>> },
>>>
>>> "payload": {
>>>
>>> "id": 1,
>>>
>>> "name": "admin"
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>>
>>> 当前Flink处理sql:
>>>
>>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>>> MINUTE)
>>>
>>>
>>>
Re:Re: kafka-connect json格式适配问题?
Posted by RS <ti...@163.com>.
Hi,
改了下sql,遇到一个新的问题:
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'.
SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload
我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
Thanks
在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>update_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>> "schema": {
>>
>> "type": "struct",
>>
>> "fields": [
>>
>> {
>>
>> "type": "int64",
>>
>> "optional": false,
>>
>> "field": "id"
>>
>> },
>>
>> {
>>
>> "type": "string",
>>
>> "optional": true,
>>
>> "field": "name"
>>
>> }
>>
>> ],
>>
>> "optional": true,
>>
>> "name": "user"
>>
>> },
>>
>> "payload": {
>>
>> "id": 1,
>>
>> "name": "admin"
>>
>> }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>
Re: kafka-connect json格式适配问题?
Posted by Jark Wu <im...@gmail.com>.
Hi,
你需要在 DDL 和 query 上都补上 schema 和 payload:
CREATE TABLE print_table \
(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
update_time TIMESTAMP(6)>) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')
-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
INSERT INTO output
SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
update_time) as payload
FROM ...
Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
mysql 不是很方便么?
Best,
Jark
On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
> hi,
> kafka->Flink->kafka->mysql
> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
> 使用kafka-connect是方便数据同时导出到其他存储
>
>
>
> Flink定义输出表结构:
>
> CREATE TABLE print_table \
>
> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>
> WITH (\
>
> 'connector' = 'kafka', \
>
> 'topic' = 'test_out', \
>
> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>
> 'sink.partitioner' = 'round-robin', \
>
> 'format' = 'json')
>
>
>
>
> 输出的数据格式示例:
>
> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>
>
>
>
> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>
> {
>
> "schema": {
>
> "type": "struct",
>
> "fields": [
>
> {
>
> "type": "int64",
>
> "optional": false,
>
> "field": "id"
>
> },
>
> {
>
> "type": "string",
>
> "optional": true,
>
> "field": "name"
>
> }
>
> ],
>
> "optional": true,
>
> "name": "user"
>
> },
>
> "payload": {
>
> "id": 1,
>
> "name": "admin"
>
> }
>
> }
>
>
>
>
> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>
> 当前Flink处理sql:
>
> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
> MINUTE)
>
>
>