You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Michael Ran <gr...@163.com> on 2020/07/22 08:10:07 UTC

flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

dear all:
         我用flink 注册一张表:
  CREATE TABLE dim_mysql (
    id int,  -- 
    type varchar -- 
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '****',
    'password' = '****',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR] 以及属性,map<String,String> 这种。
我看阿里官方有blink 支持自定义sink:
publicabstractclassCustomSinkBaseimplementsSerializable{
protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
protectedSet<String> primaryKeys;// 您定义的主键字段名
protectedList<String> headerFields;// 标记为header的字段列表
protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑

Re: Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

Posted by godfrey he <go...@gmail.com>.
1.10 也是支持的

Michael Ran <gr...@163.com> 于2020年7月22日周三 下午9:07写道:

> 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,<br/>2.with
> properties属性很重要 ,关系我自定义的一些参数设定。<br/>3.关于  catalog 这个东西,是不是只有1.11
> 版本才能从catalog  获取  with properties 哦? 1.10 you  有支持吗
> 在 2020-07-22 18:22:22,"godfrey he" <go...@gmail.com> 写道:
> >tableEnv 中 可以通过
> >tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
> >如果要拿到properties,可以通过catalog的接口得到 [1]。
> >如果要自定义实现source/sink,可以参考 [2]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
> >[2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
> >
> >Best,
> >Godfrey
> >
> >
> >
> >
> >
> >Michael Ran <gr...@163.com> 于2020年7月22日周三 下午4:10写道:
> >
> >> dear all:
> >>          我用flink 注册一张表:
> >>   CREATE TABLE dim_mysql (
> >>     id int,  --
> >>     type varchar --
> >>     ) WITH (
> >>     'connector' = 'jdbc',
> >>     'url' = 'jdbc:mysql://localhost:3390/test',
> >>     'table-name' = 'flink_test',
> >>     'driver' = 'com.mysql.cj.jdbc.Driver',
> >>     'username' = '****',
> >>     'password' = '****',
> >>     'lookup.cache.max-rows' = '5000',
> >>     'lookup.cache.ttl' = '1s',
> >>     'lookup.max-retries' = '3'
> >>     )
> >> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> >> 以及属性,map<String,String> 这种。
> >> 我看阿里官方有blink 支持自定义sink:
> >> publicabstractclassCustomSinkBaseimplementsSerializable{
> >> protectedMap<String,String> userParamsMap;// 您在sql
> with语句中定义的键值对,但所有的键均为小写
> >> protectedSet<String> primaryKeys;// 您定义的主键字段名
> >> protectedList<String> headerFields;// 标记为header的字段列表
> >> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> >> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑
>

Re:Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

Posted by Michael Ran <gr...@163.com>.
1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,<br/>2.with properties属性很重要 ,关系我自定义的一些参数设定。<br/>3.关于  catalog 这个东西,是不是只有1.11 版本才能从catalog  获取  with properties 哦? 1.10 you  有支持吗
在 2020-07-22 18:22:22,"godfrey he" <go...@gmail.com> 写道:
>tableEnv 中 可以通过
>tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
>如果要拿到properties,可以通过catalog的接口得到 [1]。
>如果要自定义实现source/sink,可以参考 [2]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
>
>Best,
>Godfrey
>
>
>
>
>
>Michael Ran <gr...@163.com> 于2020年7月22日周三 下午4:10写道:
>
>> dear all:
>>          我用flink 注册一张表:
>>   CREATE TABLE dim_mysql (
>>     id int,  --
>>     type varchar --
>>     ) WITH (
>>     'connector' = 'jdbc',
>>     'url' = 'jdbc:mysql://localhost:3390/test',
>>     'table-name' = 'flink_test',
>>     'driver' = 'com.mysql.cj.jdbc.Driver',
>>     'username' = '****',
>>     'password' = '****',
>>     'lookup.cache.max-rows' = '5000',
>>     'lookup.cache.ttl' = '1s',
>>     'lookup.max-retries' = '3'
>>     )
>> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
>> 以及属性,map<String,String> 这种。
>> 我看阿里官方有blink 支持自定义sink:
>> publicabstractclassCustomSinkBaseimplementsSerializable{
>> protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
>> protectedSet<String> primaryKeys;// 您定义的主键字段名
>> protectedList<String> headerFields;// 标记为header的字段列表
>> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
>> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑

Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

Posted by godfrey he <go...@gmail.com>.
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
如果要拿到properties,可以通过catalog的接口得到 [1]。
如果要自定义实现source/sink,可以参考 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html

Best,
Godfrey





Michael Ran <gr...@163.com> 于2020年7月22日周三 下午4:10写道:

> dear all:
>          我用flink 注册一张表:
>   CREATE TABLE dim_mysql (
>     id int,  --
>     type varchar --
>     ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://localhost:3390/test',
>     'table-name' = 'flink_test',
>     'driver' = 'com.mysql.cj.jdbc.Driver',
>     'username' = '****',
>     'password' = '****',
>     'lookup.cache.max-rows' = '5000',
>     'lookup.cache.ttl' = '1s',
>     'lookup.max-retries' = '3'
>     )
> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> 以及属性,map<String,String> 这种。
> 我看阿里官方有blink 支持自定义sink:
> publicabstractclassCustomSinkBaseimplementsSerializable{
> protectedMap<String,String> userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
> protectedSet<String> primaryKeys;// 您定义的主键字段名
> protectedList<String> headerFields;// 标记为header的字段列表
> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑