You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by RS <ti...@163.com> on 2020/07/27 09:31:30 UTC

kafka-connect json格式适配问题?

hi,
kafka->Flink->kafka->mysql
Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
使用kafka-connect是方便数据同时导出到其他存储



Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')




输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}




但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

  "schema": {

    "type": "struct",

    "fields": [

      {

        "type": "int64",

        "optional": false,

        "field": "id"

      },

      {

        "type": "string",

        "optional": true,

        "field": "name"

      }

    ],

    "optional": true,

    "name": "user"

  },

  "payload": {

    "id": 1,

    "name": "admin"

  }

}




请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)



Re:Re: kafka-connect json格式适配问题?

Posted by RS <ti...@163.com>.
Hi,
啊,发现不太对,`schema`需要一个dict,不是STRING。请教下这个如何用SQL定义出来?


Thanks
在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>update_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>>     "type": "struct",
>>
>>     "fields": [
>>
>>       {
>>
>>         "type": "int64",
>>
>>         "optional": false,
>>
>>         "field": "id"
>>
>>       },
>>
>>       {
>>
>>         "type": "string",
>>
>>         "optional": true,
>>
>>         "field": "name"
>>
>>       }
>>
>>     ],
>>
>>     "optional": true,
>>
>>     "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>>     "id": 1,
>>
>>     "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>

Re: kafka-connect json格式适配问题?

Posted by Leonard Xu <xb...@gmail.com>.
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?

窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的

祝好
Leonard 

> 在 2020年7月27日,20:05,RS <ti...@163.com> 写道:
> 
> Hi,
> 改了下sql,遇到一个新的问题:
> Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'.
> 
> 
> SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload
> 
> 
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
> 
> 
> Thanks
> 在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>> Hi,
>> 
>> 你需要在 DDL 和 query 上都补上 schema 和 payload:
>> 
>> CREATE TABLE print_table \
>> (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>> update_time TIMESTAMP(6)>) \
>> WITH (\
>> 'connector' = 'kafka', \
>> 'topic' = 'test_out', \
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>> 'sink.partitioner' = 'round-robin', \
>> 'format' = 'json')
>> 
>> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>> INSERT INTO output
>> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>> update_time) as payload
>> FROM ...
>> 
>> 
>> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>> mysql 不是很方便么?
>> 
>> Best,
>> Jark
>> 
>> 
>> On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>> 
>>> hi,
>>> kafka->Flink->kafka->mysql
>>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>>> 使用kafka-connect是方便数据同时导出到其他存储
>>> 
>>> 
>>> 
>>> Flink定义输出表结构:
>>> 
>>> CREATE TABLE print_table \
>>> 
>>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>> 
>>> WITH (\
>>> 
>>> 'connector' = 'kafka', \
>>> 
>>> 'topic' = 'test_out', \
>>> 
>>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>> 
>>> 'sink.partitioner' = 'round-robin', \
>>> 
>>> 'format' = 'json')
>>> 
>>> 
>>> 
>>> 
>>> 输出的数据格式示例:
>>> 
>>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>> 
>>> 
>>> 
>>> 
>>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>> 
>>> {
>>> 
>>>  "schema": {
>>> 
>>>    "type": "struct",
>>> 
>>>    "fields": [
>>> 
>>>      {
>>> 
>>>        "type": "int64",
>>> 
>>>        "optional": false,
>>> 
>>>        "field": "id"
>>> 
>>>      },
>>> 
>>>      {
>>> 
>>>        "type": "string",
>>> 
>>>        "optional": true,
>>> 
>>>        "field": "name"
>>> 
>>>      }
>>> 
>>>    ],
>>> 
>>>    "optional": true,
>>> 
>>>    "name": "user"
>>> 
>>>  },
>>> 
>>>  "payload": {
>>> 
>>>    "id": 1,
>>> 
>>>    "name": "admin"
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>> 
>>> 当前Flink处理sql:
>>> 
>>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>>> MINUTE)
>>> 
>>> 
>>> 


Re:Re: kafka-connect json格式适配问题?

Posted by RS <ti...@163.com>.
Hi,
改了下sql,遇到一个新的问题:
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, `username` STRING, `update_time` TIMESTAMP(6)>'.


SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time) as payload


我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?


Thanks
在 2020-07-27 17:49:18,"Jark Wu" <im...@gmail.com> 写道:
>Hi,
>
>你需要在 DDL 和 query 上都补上 schema 和 payload:
>
>CREATE TABLE print_table \
>(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>update_time TIMESTAMP(6)>) \
>WITH (\
>'connector' = 'kafka', \
>'topic' = 'test_out', \
>'properties.bootstrap.servers' = '127.0.0.1:9092', \
>'sink.partitioner' = 'round-robin', \
>'format' = 'json')
>
>-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>INSERT INTO output
>SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>update_time) as payload
>FROM ...
>
>
>Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>mysql 不是很方便么?
>
>Best,
>Jark
>
>
>On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:
>
>> hi,
>> kafka->Flink->kafka->mysql
>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>> 使用kafka-connect是方便数据同时导出到其他存储
>>
>>
>>
>> Flink定义输出表结构:
>>
>> CREATE TABLE print_table \
>>
>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>
>> WITH (\
>>
>> 'connector' = 'kafka', \
>>
>> 'topic' = 'test_out', \
>>
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>
>> 'sink.partitioner' = 'round-robin', \
>>
>> 'format' = 'json')
>>
>>
>>
>>
>> 输出的数据格式示例:
>>
>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>
>>
>>
>>
>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>
>> {
>>
>>   "schema": {
>>
>>     "type": "struct",
>>
>>     "fields": [
>>
>>       {
>>
>>         "type": "int64",
>>
>>         "optional": false,
>>
>>         "field": "id"
>>
>>       },
>>
>>       {
>>
>>         "type": "string",
>>
>>         "optional": true,
>>
>>         "field": "name"
>>
>>       }
>>
>>     ],
>>
>>     "optional": true,
>>
>>     "name": "user"
>>
>>   },
>>
>>   "payload": {
>>
>>     "id": 1,
>>
>>     "name": "admin"
>>
>>   }
>>
>> }
>>
>>
>>
>>
>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>
>> 当前Flink处理sql:
>>
>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>> MINUTE)
>>
>>
>>

Re: kafka-connect json格式适配问题?

Posted by Jark Wu <im...@gmail.com>.
Hi,

你需要在 DDL 和 query 上都补上 schema 和 payload:

CREATE TABLE print_table \
(`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
update_time TIMESTAMP(6)>) \
WITH (\
'connector' = 'kafka', \
'topic' = 'test_out', \
'properties.bootstrap.servers' = '127.0.0.1:9092', \
'sink.partitioner' = 'round-robin', \
'format' = 'json')

-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
INSERT INTO output
SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
update_time) as payload
FROM ...


Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
mysql 不是很方便么?

Best,
Jark


On Mon, 27 Jul 2020 at 17:33, RS <ti...@163.com> wrote:

> hi,
> kafka->Flink->kafka->mysql
> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
> 使用kafka-connect是方便数据同时导出到其他存储
>
>
>
> Flink定义输出表结构:
>
> CREATE TABLE print_table \
>
> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>
> WITH (\
>
> 'connector' = 'kafka', \
>
> 'topic' = 'test_out', \
>
> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>
> 'sink.partitioner' = 'round-robin', \
>
> 'format' = 'json')
>
>
>
>
> 输出的数据格式示例:
>
> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>
>
>
>
> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>
> {
>
>   "schema": {
>
>     "type": "struct",
>
>     "fields": [
>
>       {
>
>         "type": "int64",
>
>         "optional": false,
>
>         "field": "id"
>
>       },
>
>       {
>
>         "type": "string",
>
>         "optional": true,
>
>         "field": "name"
>
>       }
>
>     ],
>
>     "optional": true,
>
>     "name": "user"
>
>   },
>
>   "payload": {
>
>     "id": 1,
>
>     "name": "admin"
>
>   }
>
> }
>
>
>
>
> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>
> 当前Flink处理sql:
>
> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
> MINUTE)
>
>
>