You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by air23 <wa...@163.com> on 2020/07/27 11:21:51 UTC
回复:解析kafka的mysql binlog问题
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>
Re:回复:解析kafka的mysql binlog问题
Posted by RS <ti...@163.com>.
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去
在 2020-07-27 19:21:51,"air23" <wa...@163.com> 写道:
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>
Re: 解析kafka的mysql binlog问题
Posted by admin <17...@163.com>.
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]
[1]https://issues.apache.org/jira/browse/FLINK-18002 <https://issues.apache.org/jira/browse/FLINK-18002>
> 2020年7月28日 下午5:20,air23 <wa...@163.com> 写道:
>
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 16:02:18,"Jark Wu" <im...@gmail.com> 写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>>
>> Best,
>> Jark
>>
>> On Tue, 28 Jul 2020 at 15:36, air23 <wa...@163.com> wrote:
>>
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>>
>>>
>>> {
>>> "data":[
>>> {
>>> "op_id":"97037138",
>>> "order_id":"84172164"
>>> }
>>> ],
>>> "database":"order_11",
>>> "es":1595720375000,
>>> "id":17469027,
>>> "isDdl":false,
>>> "mysqlType":{
>>> "op_id":"int(11)",
>>> "order_id":"int(11)"
>>> },
>>> "old":null,
>>> "pkNames":[
>>> "op_id"
>>> ],
>>> "sql":"",
>>> "sqlType":{
>>> "op_id":4,
>>> "order_id":4
>>> },
>>> "table":"order_product",
>>> "ts":1595720375837,
>>> "type":"INSERT"
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>>>> 有kafka 中json 数据的样例不?
>>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>>>
>>>>
>>>>
>>>> On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>>>>
>>>>> 你好 测试代码如下
>>>>>
>>>>>
>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>>> " `data` VARCHAR , " +
>>>>> " `table` VARCHAR " +
>>>>> ") WITH (" +
>>>>> " 'connector' = 'kafka'," +
>>>>> " 'topic' = 'source_databases'," +
>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>> " 'properties.group.id' = 'real1'," +
>>>>> " 'format' = 'json'," +
>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>> ")";
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>>
>>>>> //bink table
>>>>> StreamExecutionEnvironment bsEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> EnvironmentSettings bsSettings =
>>>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>> StreamTableEnvironment bsTableEnv =
>>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>>>
>>>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>>>
>>>>>
>>>>> tableResult.print();
>>>>>
>>>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>>>
>>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>>>
>>>>> bsEnv.execute("aa");
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>>>>> ,order_operation_time
>>>>> ,inventory_batch_log
>>>>> ,order_log
>>>>> ,order_address_book
>>>>> ,product_inventory
>>>>> ,order_physical_relation
>>>>> ,bil_business_attach
>>>>> ,picking_detail
>>>>> ,picking_detail
>>>>> ,orders
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>>> 看到例子都是useOldPlanner 来转table的。
>>>>> 致谢
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>>>>>> 抱歉,还是没有看到附件。
>>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>>>
>>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>>>>>>
>>>>>>> 我再上传一次
>>>>>>>
>>>>>>> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi,
>>>>>>> 你的附件好像没有上传。
>>>>>>>
>>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>>>>>>>
>>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
>>>>>>>>
>>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
>>>>> +
>>>>>>>> " `data` VARCHAR , " +
>>>>>>>> " `table` VARCHAR " +
>>>>>>>> ") WITH (" +
>>>>>>>> " 'connector' = 'kafka'," +
>>>>>>>> " 'topic' = 'order_source'," +
>>>>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>>>>> " 'properties.group.id' = 'real1'," +
>>>>>>>> " 'format' = 'json'," +
>>>>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>>> ")";
>>>>>>>>
>>>>>>>>
>>>>>>>> 具体见附件 有打印
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>
Re:Re: Re: Re: 解析kafka的mysql binlog问题
Posted by air23 <wa...@163.com>.
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
在 2020-07-28 16:02:18,"Jark Wu" <im...@gmail.com> 写道:
>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23 <wa...@163.com> wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
>> {
>> "op_id":"97037138",
>> "order_id":"84172164"
>> }
>> ],
>> "database":"order_11",
>> "es":1595720375000,
>> "id":17469027,
>> "isDdl":false,
>> "mysqlType":{
>> "op_id":"int(11)",
>> "order_id":"int(11)"
>> },
>> "old":null,
>> "pkNames":[
>> "op_id"
>> ],
>> "sql":"",
>> "sqlType":{
>> "op_id":4,
>> "order_id":4
>> },
>> "table":"order_product",
>> "ts":1595720375837,
>> "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >> StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> > " `data` VARCHAR , " +
>> >> >> > " `table` VARCHAR " +
>> >> >> > ") WITH (" +
>> >> >> > " 'connector' = 'kafka'," +
>> >> >> > " 'topic' = 'order_source'," +
>> >> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> >> > " 'properties.group.id' = 'real1'," +
>> >> >> > " 'format' = 'json'," +
>> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> > ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>
Re: Re: Re: 解析kafka的mysql binlog问题
Posted by Jark Wu <im...@gmail.com>.
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。
Best,
Jark
On Tue, 28 Jul 2020 at 15:36, air23 <wa...@163.com> wrote:
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> > " `data` VARCHAR , " +
> >> >> > " `table` VARCHAR " +
> >> >> > ") WITH (" +
> >> >> > " 'connector' = 'kafka'," +
> >> >> > " 'topic' = 'order_source'," +
> >> >> > " 'properties.bootstrap.servers' = '***'," +
> >> >> > " 'properties.group.id' = 'real1'," +
> >> >> > " 'format' = 'json'," +
> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> > ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>
Re:Re: 解析kafka的mysql binlog问题
Posted by air23 <wa...@163.com>.
你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表 所以ARRAY<ROW< op_id STRING, order_id STRING>>
这种 里面 不是固定的
所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 转换为table。或者是我没有发现这个例子
谢谢
在 2020-07-28 16:05:55,"admin" <17...@163.com> 写道:
>data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>
>
>> 2020年7月28日 下午3:35,air23 <wa...@163.com> 写道:
>>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
>> {
>> "op_id":"97037138",
>> "order_id":"84172164"
>> }
>> ],
>> "database":"order_11",
>> "es":1595720375000,
>> "id":17469027,
>> "isDdl":false,
>> "mysqlType":{
>> "op_id":"int(11)",
>> "order_id":"int(11)"
>> },
>> "old":null,
>> "pkNames":[
>> "op_id"
>> ],
>> "sql":"",
>> "sqlType":{
>> "op_id":4,
>> "order_id":4
>> },
>> "table":"order_product",
>> "ts":1595720375837,
>> "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>>> 有kafka 中json 数据的样例不?
>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>>
>>>
>>>
>>> On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>>>
>>>> 你好 测试代码如下
>>>>
>>>>
>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>> " `data` VARCHAR , " +
>>>> " `table` VARCHAR " +
>>>> ") WITH (" +
>>>> " 'connector' = 'kafka'," +
>>>> " 'topic' = 'source_databases'," +
>>>> " 'properties.bootstrap.servers' = '***'," +
>>>> " 'properties.group.id' = 'real1'," +
>>>> " 'format' = 'json'," +
>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>> ")";
>>>> public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>> //bink table
>>>> StreamExecutionEnvironment bsEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> EnvironmentSettings bsSettings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>> StreamTableEnvironment bsTableEnv =
>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>>
>>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>>
>>>>
>>>> tableResult.print();
>>>>
>>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>>
>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>>
>>>> bsEnv.execute("aa");
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>>>> ,order_operation_time
>>>> ,inventory_batch_log
>>>> ,order_log
>>>> ,order_address_book
>>>> ,product_inventory
>>>> ,order_physical_relation
>>>> ,bil_business_attach
>>>> ,picking_detail
>>>> ,picking_detail
>>>> ,orders
>>>>
>>>>
>>>>
>>>>
>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>> 看到例子都是useOldPlanner 来转table的。
>>>> 致谢
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>>>>> 抱歉,还是没有看到附件。
>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>>
>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>>>>>
>>>>>> 我再上传一次
>>>>>>
>>>>>> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>>>>>>
>>>>>> Hi,
>>>>>> 你的附件好像没有上传。
>>>>>>
>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>>>>>>
>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>>>>>>
>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>>> +
>>>>>>> " `data` VARCHAR , " +
>>>>>>> " `table` VARCHAR " +
>>>>>>> ") WITH (" +
>>>>>>> " 'connector' = 'kafka'," +
>>>>>>> " 'topic' = 'order_source'," +
>>>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>>>> " 'properties.group.id' = 'real1'," +
>>>>>>> " 'format' = 'json'," +
>>>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>> ")";
>>>>>>>
>>>>>>>
>>>>>>> 具体见附件 有打印
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
Re: 解析kafka的mysql binlog问题
Posted by admin <17...@163.com>.
data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>
> 2020年7月28日 下午3:35,air23 <wa...@163.com> 写道:
>
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>> 有kafka 中json 数据的样例不?
>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>> On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>>>> 抱歉,还是没有看到附件。
>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>
>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>>>>
>>>>> 我再上传一次
>>>>>
>>>>> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>>>>>
>>>>> Hi,
>>>>> 你的附件好像没有上传。
>>>>>
>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>>>>>
>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>>>>>
>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>>>>> " `data` VARCHAR , " +
>>>>>> " `table` VARCHAR " +
>>>>>> ") WITH (" +
>>>>>> " 'connector' = 'kafka'," +
>>>>>> " 'topic' = 'order_source'," +
>>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>>> " 'properties.group.id' = 'real1'," +
>>>>>> " 'format' = 'json'," +
>>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>> ")";
>>>>>>
>>>>>>
>>>>>> 具体见附件 有打印
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
Re:回复: Re: Re: 解析kafka的mysql binlog问题
Posted by air23 <wa...@163.com>.
你好 使用的是<flink.version>1.11.1</flink.version>版本的
在 2020-07-28 16:02:30,"明启 孙" <37...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> > " `data` VARCHAR , " +
>>> >> > " `table` VARCHAR " +
>>> >> > ") WITH (" +
>>> >> > " 'connector' = 'kafka'," +
>>> >> > " 'topic' = 'order_source'," +
>>> >> > " 'properties.bootstrap.servers' = '***'," +
>>> >> > " 'properties.group.id' = 'real1'," +
>>> >> > " 'format' = 'json'," +
>>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> > ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>
回复: Re: Re: 解析kafka的mysql binlog问题
Posted by 明启 孙 <37...@qq.com>.
你的flink什么版本
发送自 Windows 10 版邮件应用
发件人: air23
发送时间: 2020年7月28日 15:36
收件人: user-zh@flink.apache.org
主题: Re:Re: Re: 解析kafka的mysql binlog问题
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}
在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>
Re:Re: Re: 解析kafka的mysql binlog问题
Posted by air23 <wa...@163.com>.
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}
在 2020-07-28 14:44:35,"Jark Wu" <im...@gmail.com> 写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>
Re: Re: 解析kafka的mysql binlog问题
Posted by Jark Wu <im...@gmail.com>.
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?
On Tue, 28 Jul 2020 at 09:40, air23 <wa...@163.com> wrote:
> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> > " `data` VARCHAR , " +
> >> > " `table` VARCHAR " +
> >> > ") WITH (" +
> >> > " 'connector' = 'kafka'," +
> >> > " 'topic' = 'order_source'," +
> >> > " 'properties.bootstrap.servers' = '***'," +
> >> > " 'properties.group.id' = 'real1'," +
> >> > " 'format' = 'json'," +
> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> > ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>
Re:Re: 解析kafka的mysql binlog问题
Posted by air23 <wa...@163.com>.
你好 测试代码如下
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
public static void main(String[] args) throws Exception {
//bink table
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
tableResult.print();
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
bsEnv.execute("aa");
}
输出结果如下 data都是空的。数据格式为canal解析的mysql binlog
,order_operation_time
,inventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders
另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢
在 2020-07-27 19:44:10,"Jark Wu" <im...@gmail.com> 写道:
>抱歉,还是没有看到附件。
>如果是文本的话,你可以直接贴到邮件里。
>
>On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
>
>> 我再上传一次
>>
>> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>>
>> Hi,
>> 你的附件好像没有上传。
>>
>> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>>
>> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >
>> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> > " `data` VARCHAR , " +
>> > " `table` VARCHAR " +
>> > ") WITH (" +
>> > " 'connector' = 'kafka'," +
>> > " 'topic' = 'order_source'," +
>> > " 'properties.bootstrap.servers' = '***'," +
>> > " 'properties.group.id' = 'real1'," +
>> > " 'format' = 'json'," +
>> > " 'scan.startup.mode' = 'earliest-offset'" +
>> > ")";
>> >
>> >
>> > 具体见附件 有打印
>> >
>> >
>> >
>> >
>> >
>>
>>
Re: 解析kafka的mysql binlog问题
Posted by Jark Wu <im...@gmail.com>.
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。
On Mon, 27 Jul 2020 at 19:22, air23 <wa...@163.com> wrote:
> 我再上传一次
>
> 在2020年07月27日 18:55,Jark Wu <im...@gmail.com> 写道:
>
> Hi,
> 你的附件好像没有上传。
>
> On Mon, 27 Jul 2020 at 18:17, air23 <wa...@163.com> wrote:
>
> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >
> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> > " `data` VARCHAR , " +
> > " `table` VARCHAR " +
> > ") WITH (" +
> > " 'connector' = 'kafka'," +
> > " 'topic' = 'order_source'," +
> > " 'properties.bootstrap.servers' = '***'," +
> > " 'properties.group.id' = 'real1'," +
> > " 'format' = 'json'," +
> > " 'scan.startup.mode' = 'earliest-offset'" +
> > ")";
> >
> >
> > 具体见附件 有打印
> >
> >
> >
> >
> >
>
>