You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wldd <wl...@163.com> on 2020/07/14 09:58:39 UTC

flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

hi,all:
现在遇到一个问题,通过sql-client读取mysql数据时,decimal类型会强转成decimal(38,18)
mysql ddl:
CREATE TABLE `test2` (
  `money` decimal(10,2) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert into test2 values(10.22);


flink ddl:
CREATE TABLE test2 (
    money decimal(10, 2)
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'test2',
    'connector.username' = 'root',
    'connector.password' = 'root'
);


flink查询结果,streaming模式:
sql:select * from test2;


debug信息:










--

Best,
wldd

Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,
看了下代码,Stream 模式 确实有这个问题, batch 没有,原因是:

CollectStreamTableSink 实现的是 TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType()
CollectBatchTableSink 实现的是 DataType getConsumedDataType()

刚刚搜了下,社区有个 issue [1] 在彻底解这个问题,Godgrey 已经开PR了,这应该会把这两个CollectSink都去掉,使用 TableResult#collect()来收集数据。

Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18550 <https://issues.apache.org/jira/browse/FLINK-18550>



> 在 2020年7月14日,18:55,wldd <wl...@163.com> 写道:
> 
> Hi:
> batchi模式执行结果:
> https://imgchr.com/i/UUqec6
> batch模式日志:
> https://imgchr.com/i/UUboX8
> streaming模式日志:
> https://imgchr.com/i/UUbYmF
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> Best,
> wldd
> 
> 
> 
> 
> At 2020-07-14 18:43:39, "wldd" <wl...@163.com> wrote:
> 
> Hi:
> 图片的内容没展示出来,图片的内容就是个查询结果,
> 
> 
> error日志这是batch模式的debug日志:
> 2020-07-14 18:33:23,180 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
> 2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,197 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
> 2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,201 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
> 2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 1 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[$0])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
> 2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
> 2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 0 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[$0])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 
> 
> 
> 
> 
> 
> 这是streaming模式的debug日志:
> 2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
> LogicalProject(money=[$0])
>  LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local address mosh-data-1/192.168.0.29 with timeout 200
> 2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Using InetAddress.getLocalHost() immediately for the connecting address
> 2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
> 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
> 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
> 2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 0 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3052:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3051,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
> 2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3050:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3049,exprs=[CAST($0):DECIMAL(38, 18)])
> 2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
> 2020-07-14 18:35:46,023 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 1 ms.
> optimize result:
> LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
> +- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
>   +- LogicalTableScan(table=[[mydb, test, test2]])
> 
> 
> 
> 
> 
> 
> 
> 主要区别就是streaming模式下:
> 2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
> 
> 
> 
> --
> 
> Best,
> wldd
> 
> 
> 
> 
> 
> 在 2020-07-14 18:31:33,"Leonard Xu" <xb...@gmail.com> 写道:
>> Hi,
>> 
>> 前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?
>> 
>> 祝好
>> 
>>> 在 2020年7月14日,18:21,wldd <wl...@163.com> 写道:
>>> 
>>> Hi,
>>> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> Best,
>>> wldd
>>> 
>>> 
>>> 在 2020-07-14 18:08:41,"Leonard Xu" <xb...@gmail.com> 写道:
>>>> Hi,
>>>> 
>>>> SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>>>> 你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>>>> 
>>>> 祝好,
>>>> Leonard Xu
>>>> [1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>
>>>> 
>>>>> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
>>>>> 
>>>>> sql-client
>>>> 
>>> 
>>> 
>>> 
>> 
> 
> 
> 
> 
> 


Re:Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by wldd <wl...@163.com>.
Hi:
batchi模式执行结果:
https://imgchr.com/i/UUqec6
batch模式日志:
https://imgchr.com/i/UUboX8
streaming模式日志:
https://imgchr.com/i/UUbYmF













--

Best,
wldd




At 2020-07-14 18:43:39, "wldd" <wl...@163.com> wrote:

Hi:
图片的内容没展示出来,图片的内容就是个查询结果,


error日志这是batch模式的debug日志:
2020-07-14 18:33:23,180 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,197 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,201 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])








这是streaming模式的debug日志:
2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local address mosh-data-1/192.168.0.29 with timeout 200
2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Using InetAddress.getLocalHost() immediately for the connecting address
2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3052:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3051,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3050:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3049,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,023 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])







主要区别就是streaming模式下:
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])



--

Best,
wldd





在 2020-07-14 18:31:33,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?
>
>祝好
>
>> 在 2020年7月14日,18:21,wldd <wl...@163.com> 写道:
>> 
>> Hi,
>> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
>> 
>> 
>> 
>> 
>> 
>> --
>> Best,
>> wldd
>> 
>> 
>> 在 2020-07-14 18:08:41,"Leonard Xu" <xb...@gmail.com> 写道:
>> >Hi,
>> >
>> >SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>> >
>> >祝好,
>> >Leonard Xu
>> >[1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>
>> >
>> >> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
>> >> 
>> >> sql-client
>> >
>> 
>> 
>>  
>





 

Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by wldd <wl...@163.com>.
Hi:
图片的内容没展示出来,图片的内容就是个查询结果,


error日志这是batch模式的debug日志:
2020-07-14 18:33:23,180 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,181 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,197 DEBUG org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat [] - No input splitting configured (data will be read with parallelism 1).
2020-07-14 18:33:23,198 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,201 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2907:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2906,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2905:LogicalProject.NONE.any.[](input=HepRelVertex#2904,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2912:LogicalLegacySink.NONE.any.[](input=HepRelVertex#2911,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2910:LogicalProject.NONE.any.[](input=HepRelVertex#2909,inputs=0)
2020-07-14 18:33:23,202 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#2901:LogicalTableScan.NONE.any.[](table=[mydb, test, test2])
2020-07-14 18:33:23,202 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[$0])
   +- LogicalTableScan(table=[[mydb, test, test2]])








这是streaming模式的debug日志:
2020-07-14 18:35:45,995 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,015 DEBUG org.apache.calcite.sql2rel                                   [] - Plan after converting SqlNode to RelNode
LogicalProject(money=[$0])
  LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Trying to connect to (mosh-data-1/192.168.0.29:6123) from local address mosh-data-1/192.168.0.29 with timeout 200
2020-07-14 18:35:46,016 DEBUG org.apache.flink.runtime.net.ConnectionUtils                 [] - Using InetAddress.getLocalHost() immediately for the connecting address
2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - iteration: 1
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3047:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3046,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,022 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize convert table references before rewriting sub-queries to semi-join cost 0 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])


2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3052:LogicalLegacySink.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3051,name=`mydb`.`test`.`_tmp_table_380133308`,fields=money)
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3050:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3049,exprs=[CAST($0):DECIMAL(38, 18)])
2020-07-14 18:35:46,023 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3040:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[mydb, test, test2])
2020-07-14 18:35:46,023 DEBUG org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram [] - optimize rewrite sub-queries to semi-join cost 1 ms.
optimize result:
 LogicalLegacySink(name=[`mydb`.`test`.`_tmp_table_380133308`], fields=[money])
+- LogicalProject(money=[CAST($0):DECIMAL(38, 18)])
   +- LogicalTableScan(table=[[mydb, test, test2]])







主要区别就是streaming模式下:
2020-07-14 18:35:46,022 DEBUG org.apache.calcite.plan.RelOptPlanner                        [] - For final plan, using rel#3045:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=HepRelVertex#3044,exprs=[CAST($0):DECIMAL(38, 18)])



--

Best,
wldd





在 2020-07-14 18:31:33,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?
>
>祝好
>
>> 在 2020年7月14日,18:21,wldd <wl...@163.com> 写道:
>> 
>> Hi,
>> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
>> 
>> 
>> 
>> 
>> 
>> --
>> Best,
>> wldd
>> 
>> 
>> 在 2020-07-14 18:08:41,"Leonard Xu" <xb...@gmail.com> 写道:
>> >Hi,
>> >
>> >SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>> >
>> >祝好,
>> >Leonard Xu
>> >[1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>
>> >
>> >> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
>> >> 
>> >> sql-client
>> >
>> 
>> 
>>  
>

Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

前面邮件图都挂了,理论上 SQL Client 都是会强转的,可以发个图床链接上或者贴下可以复现的代码吗?

祝好

> 在 2020年7月14日,18:21,wldd <wl...@163.com> 写道:
> 
> Hi,
> batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转
> 
> 
> 
> 
> 
> --
> Best,
> wldd
> 
> 
> 在 2020-07-14 18:08:41,"Leonard Xu" <xb...@gmail.com> 写道:
> >Hi,
> >
> >SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
> >你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
> >
> >祝好,
> >Leonard Xu
> >[1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>
> >
> >> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
> >> 
> >> sql-client
> >
> 
> 
>  


Re:Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by wldd <wl...@163.com>.
Hi,
batch模式用的不是用的legacy 的数据类型么,batch模式并没有对decimal进行强转










--

Best,
wldd





在 2020-07-14 18:08:41,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
>你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。
>
>祝好,
>Leonard Xu
>[1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>
>
>> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
>> 
>> sql-client
>

Re: flink1.11.0读取mysql数据decimal类型会强转成decimal(38,18)的问题

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

SQL client 读取mysql的部分想当于一个connector,  这个connector只支持 DECIMAL(38,18)的,所有DECIMAL(p, s)都会转到这个类型,这是因为SQL Client还是用的legacy 的数据类型。
你可以用其他 connector 如 Filesystem、Kafka等connector读取, 社区已经有一个issue[1] 在跟进了。

祝好,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-17948 <https://issues.apache.org/jira/browse/FLINK-17948>

> 在 2020年7月14日,17:58,wldd <wl...@163.com> 写道:
> 
> sql-client