You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/03/05 03:40:22 UTC

How to use self defined json format when create table from kafka stream?

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
    + "    order_id BIGINT,\n"
    + "    order_no VARCHAR,\n"
    + ") WITH (\n"
    + "    'connector.type' = 'kafka',\n"
    ...........
    + "    'update-mode' = 'append',\n"
    + "    'format.type' = 'json',\n"
    + "    'format.derive-schema' = 'true'\n"
    + ")");using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
    "id" : 1,
    "name" : "order_id",
    "column_type" : -5,
    "last_value" : 4606458,
    "value" : 4606458
  }, {
    "id" : 2,
    "name" : "order_no",
    "column_type" : 12,
    "last_value" : "EDBMFSJ00001S2003050006628",
    "value" : "EDBMFSJ00001S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined format class.

Thanks,
Lei



wanglei2@geekplus.com.cn


Re: How to use self defined json format when create table from kafka stream?

Posted by Kurt Young <yk...@gmail.com>.
User defined formats also sounds like an interesting extension.

Best,
Kurt


On Thu, Mar 5, 2020 at 3:06 PM Jark Wu <im...@gmail.com> wrote:

> Hi Lei,
>
> Currently, Flink SQL doesn't support to register a binlog format (i.e.
> just define "order_id" and "order_no", but the json schema has other binlog
> fields).
> This is exactly what we want to support in FLIP-105 [1] and FLIP-95.
>
> For now, if you want to consume such json data, you have to define the
> full schema, e.g. "type", "timestmap", and so on...
>
> Btw, what Change Data Capture (CDC) tool are you using?
>
> Best,
> Jark
>
> [1]:
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>
>
> On Thu, 5 Mar 2020 at 11:40, wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> I want to rigister a table from mysql binlog like this:
>>
>> tEnv.sqlUpdate("CREATE TABLE order(\n"
>>     + "    order_id BIGINT,\n"
>>     + "    order_no VARCHAR,\n"
>>     + ") WITH (\n"
>>     + "    'connector.type' = 'kafka',\n"
>>     ...........
>>     + "    'update-mode' = 'append',\n"
>>     + "    'format.type' = 'json',\n"
>>     + "    'format.derive-schema' = 'true'\n"
>>     + ")");
>>
>> using the following log format:
>>
>> {
>>   "type" : "update",
>>   "timestamp" : 1583373066000,
>>   "binlog_filename" : "mysql-bin.000453",
>>   "binlog_position" : 923020943,
>>   "database" : "wms",
>>   "table_name" : "t_pick_order",
>>   "table_id" : 131936,
>>   "columns" : [ {
>>     "id" : 1,
>>     "name" : "order_id",
>>     "column_type" : -5,
>>     "last_value" : 4606458,
>>     "value" : 4606458
>>   }, {
>>     "id" : 2,
>>     "name" : "order_no",
>>     "column_type" : 12,
>>     "last_value" : "EDBMFSJ00001S2003050006628",
>>     "value" : "EDBMFSJ00001S2003050006628"
>>   }]
>> }
>>
>>
>> Surely the format.type' = 'json',\n" will not parse the result as I
>> expected.
>> Is there any method I can implement this? For example, using a self
>> defined format class.
>>
>> Thanks,
>> Lei
>>
>> ------------------------------
>> wanglei2@geekplus.com.cn
>>
>>
>>

Re: How to use self defined json format when create table from kafka stream?

Posted by Jark Wu <im...@gmail.com>.
Hi Lei,

Currently, Flink SQL doesn't support to register a binlog format (i.e. just
define "order_id" and "order_no", but the json schema has other binlog
fields).
This is exactly what we want to support in FLIP-105 [1] and FLIP-95.

For now, if you want to consume such json data, you have to define the full
schema, e.g. "type", "timestmap", and so on...

Btw, what Change Data Capture (CDC) tool are you using?

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#


On Thu, 5 Mar 2020 at 11:40, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> I want to rigister a table from mysql binlog like this:
>
> tEnv.sqlUpdate("CREATE TABLE order(\n"
>     + "    order_id BIGINT,\n"
>     + "    order_no VARCHAR,\n"
>     + ") WITH (\n"
>     + "    'connector.type' = 'kafka',\n"
>     ...........
>     + "    'update-mode' = 'append',\n"
>     + "    'format.type' = 'json',\n"
>     + "    'format.derive-schema' = 'true'\n"
>     + ")");
>
> using the following log format:
>
> {
>   "type" : "update",
>   "timestamp" : 1583373066000,
>   "binlog_filename" : "mysql-bin.000453",
>   "binlog_position" : 923020943,
>   "database" : "wms",
>   "table_name" : "t_pick_order",
>   "table_id" : 131936,
>   "columns" : [ {
>     "id" : 1,
>     "name" : "order_id",
>     "column_type" : -5,
>     "last_value" : 4606458,
>     "value" : 4606458
>   }, {
>     "id" : 2,
>     "name" : "order_no",
>     "column_type" : 12,
>     "last_value" : "EDBMFSJ00001S2003050006628",
>     "value" : "EDBMFSJ00001S2003050006628"
>   }]
> }
>
>
> Surely the format.type' = 'json',\n" will not parse the result as I
> expected.
> Is there any method I can implement this? For example, using a self
> defined format class.
>
> Thanks,
> Lei
>
> ------------------------------
> wanglei2@geekplus.com.cn
>
>
>