You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Chenzhiyuan(HR)" <zh...@huawei.com> on 2021/07/09 00:59:36 UTC

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

   定义的表如下:
   CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
   "beforeData": [],
    "byteSize": 272,
    "columnNumber": 32,
    "data": [{
        "byteSize": 8,
        "columnName": "APPLY_PERSON_ID",
        "rawData": 10017,
        "type": "LONG"
    }, {
        "byteSize": 12,
        "columnName": "UPDATE_SALARY",
        "rawData": "11000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 11,
        "columnName": "UP_AMOUNT",
        "rawData": "1000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 3,
        "columnName": "CURRENCY",
        "rawData": "CNY",
        "type": "STRING"
    }, {
        "byteSize": 32,
        "columnName": "EXCHANGE_RATE",
        "rawData": "1.000000000000000000000000000000",
        "type": "DOUBLE"
    },  {
        "byteSize": 11,
        "columnName": "DEDUCTED_ACCOUNT",
        "rawData": "1000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 1,
        "columnName": "ENTER_AT_PROCESS",
        "rawData": "Y",
        "type": "STRING"
    }],
    "dataCount": 0,
    "dataMetaData": {
        "connector": "mysql",
        "pos": 1000368076,
        "row": 0,
        "ts_ms": 1625565737000,
        "snapshot": "false",
        "db": "testdb",
        "table": "flow_person_t"
    },
    "key": "APPLY_PERSON_ID",
    "memorySize": 1120,
    "operation": "insert",
    "rowIndex": -1,
    "timestamp": "1970-01-01 00:00:00"
}


Re:如何从复杂的kafka消息体定义 table

Posted by 东东 <do...@163.com>.
自己实现一个DeserializationFormatFactory就行




可以参考官方的CanalJsonFormatFactory或者DebeziumJsonFormatFactory



在 2021-07-09 08:59:36,"Chenzhiyuan(HR)" <zh...@huawei.com> 写道:
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>   定义的表如下:
>   CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>   "beforeData": [],
>    "byteSize": 272,
>    "columnNumber": 32,
>    "data": [{
>        "byteSize": 8,
>        "columnName": "APPLY_PERSON_ID",
>        "rawData": 10017,
>        "type": "LONG"
>    }, {
>        "byteSize": 12,
>        "columnName": "UPDATE_SALARY",
>        "rawData": "11000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 11,
>        "columnName": "UP_AMOUNT",
>        "rawData": "1000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 3,
>        "columnName": "CURRENCY",
>        "rawData": "CNY",
>        "type": "STRING"
>    }, {
>        "byteSize": 32,
>        "columnName": "EXCHANGE_RATE",
>        "rawData": "1.000000000000000000000000000000",
>        "type": "DOUBLE"
>    },  {
>        "byteSize": 11,
>        "columnName": "DEDUCTED_ACCOUNT",
>        "rawData": "1000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 1,
>        "columnName": "ENTER_AT_PROCESS",
>        "rawData": "Y",
>        "type": "STRING"
>    }],
>    "dataCount": 0,
>    "dataMetaData": {
>        "connector": "mysql",
>        "pos": 1000368076,
>        "row": 0,
>        "ts_ms": 1625565737000,
>        "snapshot": "false",
>        "db": "testdb",
>        "table": "flow_person_t"
>    },
>    "key": "APPLY_PERSON_ID",
>    "memorySize": 1120,
>    "operation": "insert",
>    "rowIndex": -1,
>    "timestamp": "1970-01-01 00:00:00"
>}
>

Re: 回复: 如何从复杂的kafka消息体定义 table

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!您可以像 JasonLee 提供的文章中一样先用 DDL 描述 kafka 消息的结构,之后在 SQL 代码中通过 create view 抽取
APPLY_PERSON_ID 等信息,就可以达成您需要的效果。

您的一条 kafka 消息似乎就对应 MyUserTable 的一行,看起来没有列转行的需求。

Chenzhiyuan(HR) <zh...@huawei.com> 于2021年7月9日周五 上午11:57写道:

> 列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。
>
> -----邮件原件-----
> 发件人: JasonLee [mailto:17610775726@163.com]
> 发送时间: 2021年7月9日 11:34
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 如何从复杂的kafka消息体定义 table
>
> Hi
>
>
> 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.
>
>
> Best
> JasonLee
>
>
> 在2021年07月9日 10:06,Chenzhiyuan(HR)<zh...@huawei.com> 写道:
> 消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:
>
> CREATE TABLE MyUserTable(
> APPLY_PERSON_ID VARCHAR,
> UPDATE_SALARY DECIMAL,
> UP_AMOUNT DECIMAL,
> CURRENCY VARCHAR,
> EXCHANGE_RATE DECIMAL
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
>
> 接下来直接查询每个字段的值:
> Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY,
> UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");
>
> 请教下这个该如何定义DDL.
>
>
>
> 发件人: 17610775726 [mailto:17610775726@163.com]
> 发送时间: 2021年7月9日 9:26
> 收件人: Chenzhiyuan(HR) <zh...@huawei.com>
> 主题: 回复:如何从复杂的kafka消息体定义 table
>
> hi
>
> 用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
> Best
> JasonLee
> ---- 回复的原邮件 ----
> 发件人
>
> Chenzhiyuan(HR)<zh...@huawei.com>
>
> 发送日期
>
> 2021年07月09日 08:59
>
> 收件人
>
> user-zh@flink.apache.org<us...@flink.apache.org><mailto:
> user-zh@flink.apache.org>
>
> 主题
>
> 如何从复杂的kafka消息体定义 table
>
> 大家好:
> 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
> 如果json, avro不能满足的话,是不是得自己自定义一个。
> 自定义的话不知道如何写,请各位帮忙指教下。
>
> 定义的表如下:
> CREATE TABLE MyUserTable(
> uuid VARCHAR,
> orgId VARCHAR
> ) with (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'topic_name',
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
> 'connector.properties.bootstrap.servers' = 'localhost:9092', '
> connector.properties.group.id' = 'testGroup', 'format.type' = '?'
> )
>
>
> Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
> {
> "beforeData": [],
> "byteSize": 272,
> "columnNumber": 32,
> "data": [{
> "byteSize": 8,
> "columnName": "APPLY_PERSON_ID",
> "rawData": 10017,
> "type": "LONG"
> }, {
> "byteSize": 12,
> "columnName": "UPDATE_SALARY",
> "rawData": "11000.000000",
> "type": "DOUBLE"
> }, {
> "byteSize": 11,
> "columnName": "UP_AMOUNT",
> "rawData": "1000.000000",
> "type": "DOUBLE"
> }, {
> "byteSize": 3,
> "columnName": "CURRENCY",
> "rawData": "CNY",
> "type": "STRING"
> }, {
> "byteSize": 32,
> "columnName": "EXCHANGE_RATE",
> "rawData": "1.000000000000000000000000000000",
> "type": "DOUBLE"
> },  {
> "byteSize": 11,
> "columnName": "DEDUCTED_ACCOUNT",
> "rawData": "1000.000000",
> "type": "DOUBLE"
> }, {
> "byteSize": 1,
> "columnName": "ENTER_AT_PROCESS",
> "rawData": "Y",
> "type": "STRING"
> }],
> "dataCount": 0,
> "dataMetaData": {
> "connector": "mysql",
> "pos": 1000368076,
> "row": 0,
> "ts_ms": 1625565737000,
> "snapshot": "false",
> "db": "testdb",
> "table": "flow_person_t"
> },
> "key": "APPLY_PERSON_ID",
> "memorySize": 1120,
> "operation": "insert",
> "rowIndex": -1,
> "timestamp": "1970-01-01 00:00:00"
> }
>

答复: 回复: 如何从复杂的kafka消息体定义 table

Posted by "Chenzhiyuan(HR)" <zh...@huawei.com>.
列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。

-----邮件原件-----
发件人: JasonLee [mailto:17610775726@163.com] 
发送时间: 2021年7月9日 11:34
收件人: user-zh@flink.apache.org
主题: 回复: 如何从复杂的kafka消息体定义 table

Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR)<zh...@huawei.com> 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775726@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) <zh...@huawei.com>
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
---- 回复的原邮件 ----
发件人

Chenzhiyuan(HR)<zh...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<us...@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.000000",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.000000000000000000000000000000",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}

回复: 如何从复杂的kafka消息体定义 table

Posted by JasonLee <17...@163.com>.
Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR)<zh...@huawei.com> 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775726@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) <zh...@huawei.com>
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
---- 回复的原邮件 ----
发件人

Chenzhiyuan(HR)<zh...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<us...@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.000000",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.000000000000000000000000000000",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}

Re:答复: 回复:如何从复杂的kafka消息体定义 table

Posted by mack143 <ma...@163.com>.
退订
在 2021-07-09 10:06:19,"Chenzhiyuan(HR)" <zh...@huawei.com> 写道:
>消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:
>
>CREATE TABLE MyUserTable(
>APPLY_PERSON_ID VARCHAR,
>UPDATE_SALARY DECIMAL,
>UP_AMOUNT DECIMAL,
>CURRENCY VARCHAR,
>EXCHANGE_RATE DECIMAL
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>
>接下来直接查询每个字段的值:
>Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");
>
>请教下这个该如何定义DDL.
>
>
>
>发件人: 17610775726 [mailto:17610775726@163.com]
>发送时间: 2021年7月9日 9:26
>收件人: Chenzhiyuan(HR) <zh...@huawei.com>
>主题: 回复:如何从复杂的kafka消息体定义 table
>
>hi
>
>用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA
>
>Best
>JasonLee
>---- 回复的原邮件 ----
>发件人
>
>Chenzhiyuan(HR)<zh...@huawei.com>
>
>发送日期
>
>2021年07月09日 08:59
>
>收件人
>
>user-zh@flink.apache.org<us...@flink.apache.org>
>
>主题
>
>如何从复杂的kafka消息体定义 table
>
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>  定义的表如下:
>  CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>  "beforeData": [],
>   "byteSize": 272,
>   "columnNumber": 32,
>   "data": [{
>       "byteSize": 8,
>       "columnName": "APPLY_PERSON_ID",
>       "rawData": 10017,
>       "type": "LONG"
>   }, {
>       "byteSize": 12,
>       "columnName": "UPDATE_SALARY",
>       "rawData": "11000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 11,
>       "columnName": "UP_AMOUNT",
>       "rawData": "1000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 3,
>       "columnName": "CURRENCY",
>       "rawData": "CNY",
>       "type": "STRING"
>   }, {
>       "byteSize": 32,
>       "columnName": "EXCHANGE_RATE",
>       "rawData": "1.000000000000000000000000000000",
>       "type": "DOUBLE"
>   },  {
>       "byteSize": 11,
>       "columnName": "DEDUCTED_ACCOUNT",
>       "rawData": "1000.000000",
>       "type": "DOUBLE"
>   }, {
>       "byteSize": 1,
>       "columnName": "ENTER_AT_PROCESS",
>       "rawData": "Y",
>       "type": "STRING"
>   }],
>   "dataCount": 0,
>   "dataMetaData": {
>       "connector": "mysql",
>       "pos": 1000368076,
>       "row": 0,
>       "ts_ms": 1625565737000,
>       "snapshot": "false",
>       "db": "testdb",
>       "table": "flow_person_t"
>   },
>   "key": "APPLY_PERSON_ID",
>   "memorySize": 1120,
>   "operation": "insert",
>   "rowIndex": -1,
>   "timestamp": "1970-01-01 00:00:00"
>}

答复: 回复:如何从复杂的kafka消息体定义 table

Posted by "Chenzhiyuan(HR)" <zh...@huawei.com>.
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775726@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) <zh...@huawei.com>
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
---- 回复的原邮件 ----
发件人

Chenzhiyuan(HR)<zh...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<us...@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

  定义的表如下:
  CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
  "beforeData": [],
   "byteSize": 272,
   "columnNumber": 32,
   "data": [{
       "byteSize": 8,
       "columnName": "APPLY_PERSON_ID",
       "rawData": 10017,
       "type": "LONG"
   }, {
       "byteSize": 12,
       "columnName": "UPDATE_SALARY",
       "rawData": "11000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 11,
       "columnName": "UP_AMOUNT",
       "rawData": "1000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 3,
       "columnName": "CURRENCY",
       "rawData": "CNY",
       "type": "STRING"
   }, {
       "byteSize": 32,
       "columnName": "EXCHANGE_RATE",
       "rawData": "1.000000000000000000000000000000",
       "type": "DOUBLE"
   },  {
       "byteSize": 11,
       "columnName": "DEDUCTED_ACCOUNT",
       "rawData": "1000.000000",
       "type": "DOUBLE"
   }, {
       "byteSize": 1,
       "columnName": "ENTER_AT_PROCESS",
       "rawData": "Y",
       "type": "STRING"
   }],
   "dataCount": 0,
   "dataMetaData": {
       "connector": "mysql",
       "pos": 1000368076,
       "row": 0,
       "ts_ms": 1625565737000,
       "snapshot": "false",
       "db": "testdb",
       "table": "flow_person_t"
   },
   "key": "APPLY_PERSON_ID",
   "memorySize": 1120,
   "operation": "insert",
   "rowIndex": -1,
   "timestamp": "1970-01-01 00:00:00"
}