You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by zhisheng <zh...@gmail.com> on 2020/03/18 14:21:26 UTC

Field types of query result and registered TableSink [Result] do not match

hi, all

我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score
字段也是定义的 numeric(5,2) 类型,结果会报异常。

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
[Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink [Result] do not match.
Query result schema: [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
BigDecimal, is_delete: Boolean]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)

我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个 bug?

Re: Field types of query result and registered TableSink [Result] do not match

Posted by zhisheng <zh...@gmail.com>.
好的,了解了,多谢 Jark

Jark Wu <im...@gmail.com> 于2020年3月19日周四 上午10:39写道:

> Hi zhisheng,
>
> 目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写
> DECIMAL ,默认就是 38, 18。
> 这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。
>
> Best,
> Jark
>
> On Thu, 19 Mar 2020 at 10:31, zhisheng <zh...@gmail.com> wrote:
>
> > hi, Jark
> >
> > 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:
> >
> > String ddlSource = "CREATE TABLE test (\n" +
> >                 "    yidun_score numeric(5, 2)\n" +
> >                 ") WITH (\n" +
> >                 "    'connector.type' = 'kafka',\n" +
> >                 "    'connector.version' = '0.11',\n" +
> >                 "    'connector.topic' = 'test',\n" +
> >                 "    'connector.startup-mode' = 'latest-offset',\n" +
> >                 "    'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> >                 "    'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> >                 "    'format.type' = 'json'\n" +
> >                 ")";
> >
> >         String ddlSink = "CREATE TABLE test_aggregate (\n" +
> >                 "    yidun_score numeric(5, 2)\n" +
> >                 ") WITH (\n" +
> >                 "    'connector.type' = 'jdbc',\n" +
> >                 "    'connector.driver' = 'org.postgresql.Driver',\n" +
> >                 "    'connector.url' =
> > 'jdbc:postgresql://localhost:3600/test',\n" +
> >                 "    'connector.table' = 'test_aggregate', \n" +
> >                 "    'connector.username' = 'admin', \n" +
> >                 "    'connector.password' = '1234546',\n" +
> >                 "    'connector.write.flush.max-rows' = '1' \n" +
> >                 ")";
> >
> >         String sql = "insert into test_aggregate select yidun_score from
> > test";
> >
> >         blinkStreamTableEnv.sqlUpdate(ddlSource);
> >         blinkStreamTableEnv.sqlUpdate(ddlSink);
> >         blinkStreamTableEnv.sqlUpdate(sql);
> >
> > 没有自定义过 TableSink
> >
> >
> >
> > Jark Wu <im...@gmail.com> 于2020年3月19日周四 上午9:43写道:
> >
> > > Hi zhisheng,
> > >
> > > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> > > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> > > 这是框架做的一个合法性校验。
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 19 Mar 2020 at 09:33, zhisheng <zh...@gmail.com> wrote:
> > >
> > > > hi, Jark
> > > >
> > > > 我刚使用 1.10.0 测试,报错异常如下:
> > > >
> > > > Exception in thread "main"
> > > org.apache.flink.table.api.ValidationException:
> > > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with
> the
> > > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field
> > of
> > > > the TableSink consumed type.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > > > at scala.Option.map(Option.scala:146)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > > > Caused by: org.apache.flink.table.api.ValidationException: Legacy
> > decimal
> > > > type can only be mapped to DECIMAL(38, 18).
> > > > ... 26 more
> > > >
> > > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18)
> 类型,就不报错了。
> > > >
> > > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> > > >
> > > >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> > > >
> > > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> > > >
> > > > Checks whether the given physical field type and logical field type
> are
> > > > compatible at the edges of the table ecosystem. Types are still
> > > compatible
> > > > if the physical type is a legacy decimal type (converted from
> > > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> > > support
> > > > legacy TypeInformation for TableSource and TableSink.
> > > >
> > > > 看起来像是在兼容旧的 TypeInformation
> > > >
> > > > zhisheng <zh...@gmail.com> 于2020年3月19日周四 上午8:31写道:
> > > >
> > > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > > > >
> > > > > Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:
> > > > >
> > > > >> Hi zhisheng,
> > > > >>
> > > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > > > >>
> > > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > hi, all
> > > > >> >
> > > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > > > >> yidun_score
> > > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > > > >> >
> > > > >> > org.apache.flink.client.program.ProgramInvocationException: The
> > main
> > > > >> method
> > > > >> > caused an error: Field types of query result and registered
> > > TableSink
> > > > >> > [Result] do not match.
> > > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > > > >> > at
> > > > >>
> > >
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > > > >> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > > > >> > at java.security.AccessController.doPrivileged(Native Method)
> > > > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > > >> > at
> > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> > > types
> > > > >> of
> > > > >> > query result and registered TableSink [Result] do not match.
> > > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > > Integer,
> > > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > > >> yidun_score:
> > > > >> > BigDecimal, is_delete: Boolean]
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > > > >> > at scala.Option.map(Option.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > >> >
> > > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个
> case
> > > > 是不是一个
> > > > >> > bug?
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by Jark Wu <im...@gmail.com>.
Hi zhisheng,

目前 kafka source & jdbc sink 都是用的 TypeInformation ,所以都只能声明成 38, 18 或者直接写
DECIMAL ,默认就是 38, 18。
这个问题会在升级到 new source/sink interface (FLIP-95)后有效解决。

Best,
Jark

On Thu, 19 Mar 2020 at 10:31, zhisheng <zh...@gmail.com> wrote:

> hi, Jark
>
> 我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:
>
> String ddlSource = "CREATE TABLE test (\n" +
>                 "    yidun_score numeric(5, 2)\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',\n" +
>                 "    'connector.version' = '0.11',\n" +
>                 "    'connector.topic' = 'test',\n" +
>                 "    'connector.startup-mode' = 'latest-offset',\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
>                 "    'format.type' = 'json'\n" +
>                 ")";
>
>         String ddlSink = "CREATE TABLE test_aggregate (\n" +
>                 "    yidun_score numeric(5, 2)\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'jdbc',\n" +
>                 "    'connector.driver' = 'org.postgresql.Driver',\n" +
>                 "    'connector.url' =
> 'jdbc:postgresql://localhost:3600/test',\n" +
>                 "    'connector.table' = 'test_aggregate', \n" +
>                 "    'connector.username' = 'admin', \n" +
>                 "    'connector.password' = '1234546',\n" +
>                 "    'connector.write.flush.max-rows' = '1' \n" +
>                 ")";
>
>         String sql = "insert into test_aggregate select yidun_score from
> test";
>
>         blinkStreamTableEnv.sqlUpdate(ddlSource);
>         blinkStreamTableEnv.sqlUpdate(ddlSink);
>         blinkStreamTableEnv.sqlUpdate(sql);
>
> 没有自定义过 TableSink
>
>
>
> Jark Wu <im...@gmail.com> 于2020年3月19日周四 上午9:43写道:
>
> > Hi zhisheng,
> >
> > 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> > legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> > 这是框架做的一个合法性校验。
> >
> > Best,
> > Jark
> >
> > On Thu, 19 Mar 2020 at 09:33, zhisheng <zh...@gmail.com> wrote:
> >
> > > hi, Jark
> > >
> > > 我刚使用 1.10.0 测试,报错异常如下:
> > >
> > > Exception in thread "main"
> > org.apache.flink.table.api.ValidationException:
> > > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> > > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field
> of
> > > the TableSink consumed type.
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > > at
> > >
> > >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > > at
> > >
> > >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > > at scala.Option.map(Option.scala:146)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > > at
> > >
> > >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > > Caused by: org.apache.flink.table.api.ValidationException: Legacy
> decimal
> > > type can only be mapped to DECIMAL(38, 18).
> > > ... 26 more
> > >
> > > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
> > >
> > > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> > >
> > >
> http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> > >
> > > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> > >
> > > Checks whether the given physical field type and logical field type are
> > > compatible at the edges of the table ecosystem. Types are still
> > compatible
> > > if the physical type is a legacy decimal type (converted from
> > > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> > support
> > > legacy TypeInformation for TableSource and TableSink.
> > >
> > > 看起来像是在兼容旧的 TypeInformation
> > >
> > > zhisheng <zh...@gmail.com> 于2020年3月19日周四 上午8:31写道:
> > >
> > > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > > >
> > > > Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:
> > > >
> > > >> Hi zhisheng,
> > > >>
> > > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > > >>
> > > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com>
> > wrote:
> > > >>
> > > >> > hi, all
> > > >> >
> > > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > > >> yidun_score
> > > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > > >> >
> > > >> > org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > >> method
> > > >> > caused an error: Field types of query result and registered
> > TableSink
> > > >> > [Result] do not match.
> > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > > >> > at
> > > >>
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > > >> > at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > > >> > at java.security.AccessController.doPrivileged(Native Method)
> > > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> > types
> > > >> of
> > > >> > query result and registered TableSink [Result] do not match.
> > > >> > Query result schema: [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> > Integer,
> > > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > > >> yidun_score:
> > > >> > BigDecimal, is_delete: Boolean]
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > > >> > at scala.Option.map(Option.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > >> > at
> > > >> >
> > > >> >
> > > >>
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > >> >
> > > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> > > 是不是一个
> > > >> > bug?
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by zhisheng <zh...@gmail.com>.
hi, Jark

我只是使用了 flink-jdbc 这个 connector,发下我本地测试的 DDL 和 SQL 如下:

String ddlSource = "CREATE TABLE test (\n" +
                "    yidun_score numeric(5, 2)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test',\n" +
                "    'connector.startup-mode' = 'latest-offset',\n" +
                "    'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                "    'format.type' = 'json'\n" +
                ")";

        String ddlSink = "CREATE TABLE test_aggregate (\n" +
                "    yidun_score numeric(5, 2)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'jdbc',\n" +
                "    'connector.driver' = 'org.postgresql.Driver',\n" +
                "    'connector.url' =
'jdbc:postgresql://localhost:3600/test',\n" +
                "    'connector.table' = 'test_aggregate', \n" +
                "    'connector.username' = 'admin', \n" +
                "    'connector.password' = '1234546',\n" +
                "    'connector.write.flush.max-rows' = '1' \n" +
                ")";

        String sql = "insert into test_aggregate select yidun_score from
test";

        blinkStreamTableEnv.sqlUpdate(ddlSource);
        blinkStreamTableEnv.sqlUpdate(ddlSink);
        blinkStreamTableEnv.sqlUpdate(sql);

没有自定义过 TableSink



Jark Wu <im...@gmail.com> 于2020年3月19日周四 上午9:43写道:

> Hi zhisheng,
>
> 我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
> legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
> 这是框架做的一个合法性校验。
>
> Best,
> Jark
>
> On Thu, 19 Mar 2020 at 09:33, zhisheng <zh...@gmail.com> wrote:
>
> > hi, Jark
> >
> > 我刚使用 1.10.0 测试,报错异常如下:
> >
> > Exception in thread "main"
> org.apache.flink.table.api.ValidationException:
> > Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> > physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
> > the TableSink consumed type.
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> > at
> >
> >
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> > at
> >
> >
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> > at scala.Option.map(Option.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> > at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> > Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
> > type can only be mapped to DECIMAL(38, 18).
> > ... 26 more
> >
> > 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
> >
> > 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
> >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
> >
> > 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
> >
> > Checks whether the given physical field type and logical field type are
> > compatible at the edges of the table ecosystem. Types are still
> compatible
> > if the physical type is a legacy decimal type (converted from
> > Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to
> support
> > legacy TypeInformation for TableSource and TableSink.
> >
> > 看起来像是在兼容旧的 TypeInformation
> >
> > zhisheng <zh...@gmail.com> 于2020年3月19日周四 上午8:31写道:
> >
> > > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> > >
> > > Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:
> > >
> > >> Hi zhisheng,
> > >>
> > >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> > >>
> > >> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com>
> wrote:
> > >>
> > >> > hi, all
> > >> >
> > >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> > >> yidun_score
> > >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> > >> >
> > >> > org.apache.flink.client.program.ProgramInvocationException: The main
> > >> method
> > >> > caused an error: Field types of query result and registered
> TableSink
> > >> > [Result] do not match.
> > >> > Query result schema: [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > >> > at
> > >>
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > >> > at java.security.AccessController.doPrivileged(Native Method)
> > >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >> > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > >> > Caused by: org.apache.flink.table.api.ValidationException: Field
> types
> > >> of
> > >> > query result and registered TableSink [Result] do not match.
> > >> > Query result schema: [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > TableSink schema:    [user_new_id: Long, total_credit_score:
> Integer,
> > >> > total_order_count: Integer, loss_total_order_count: Integer,
> > >> yidun_score:
> > >> > BigDecimal, is_delete: Boolean]
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > >> > at scala.Option.map(Option.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >> > at
> > >> >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > >> >
> > >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> > 是不是一个
> > >> > bug?
> > >> >
> > >>
> > >
> >
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by Jark Wu <im...@gmail.com>.
Hi zhisheng,

我猜测你的 TableSink 的实现用了 legacy type, i.e. TypeInformation, 而不是 DataType。
legacy type 是无法表达精度的,所以只能 mapping 到默认的 38, 18 上。
这是框架做的一个合法性校验。

Best,
Jark

On Thu, 19 Mar 2020 at 09:33, zhisheng <zh...@gmail.com> wrote:

> hi, Jark
>
> 我刚使用 1.10.0 测试,报错异常如下:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
> physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
> the TableSink consumed type.
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
> at
>
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
> Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
> type can only be mapped to DECIMAL(38, 18).
> ... 26 more
>
> 看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。
>
> 看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验
>
> http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png
>
> 看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:
>
> Checks whether the given physical field type and logical field type are
> compatible at the edges of the table ecosystem. Types are still compatible
> if the physical type is a legacy decimal type (converted from
> Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support
> legacy TypeInformation for TableSource and TableSink.
>
> 看起来像是在兼容旧的 TypeInformation
>
> zhisheng <zh...@gmail.com> 于2020年3月19日周四 上午8:31写道:
>
> > 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
> >
> > Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:
> >
> >> Hi zhisheng,
> >>
> >> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
> >>
> >> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com> wrote:
> >>
> >> > hi, all
> >> >
> >> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> >> yidun_score
> >> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> >> >
> >> > org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> > caused an error: Field types of query result and registered TableSink
> >> > [Result] do not match.
> >> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> >> > at
> >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> >> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> >> > at java.security.AccessController.doPrivileged(Native Method)
> >> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >> > at
> >> >
> >> >
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> >> > Caused by: org.apache.flink.table.api.ValidationException: Field types
> >> of
> >> > query result and registered TableSink [Result] do not match.
> >> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> >> > total_order_count: Integer, loss_total_order_count: Integer,
> >> yidun_score:
> >> > BigDecimal, is_delete: Boolean]
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> >> > at scala.Option.map(Option.scala:146)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> >> > at
> >> >
> >> >
> >>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> >> > at
> >> >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> > at
> >> >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >> >
> >> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case
> 是不是一个
> >> > bug?
> >> >
> >>
> >
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by zhisheng <zh...@gmail.com>.
hi, Jark

我刚使用 1.10.0 测试,报错异常如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Type DECIMAL(5, 2) of table field 'yidun_score' does not match with the
physical type LEGACY('DECIMAL', 'DECIMAL') of the 'yidun_score' field of
the TableSink consumed type.
at
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:265)
at
org.apache.flink.table.utils.TypeMappingUtils$1.visit(TypeMappingUtils.java:254)
at
org.apache.flink.table.types.logical.LegacyTypeInformationType.accept(LegacyTypeInformationType.java:102)
at
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
at
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:287)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:280)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.zhisheng.sql.blink.stream.example.Test.main(Test.java:49)
Caused by: org.apache.flink.table.api.ValidationException: Legacy decimal
type can only be mapped to DECIMAL(38, 18).
... 26 more

看报错日志像是只能使用 DECIMAL(38, 18),不然映射会失败,然后我将表定义成 DECIMAL(38, 18) 类型,就不报错了。

看了下源码是 TypeMappingUtils 类中的 checkIfCompatible 做了校验

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-19-012349.png

看了下这个 private 方法的上层是 checkPhysicalLogicalTypeCompatible 方法,这个方法的注释说:

Checks whether the given physical field type and logical field type are
compatible at the edges of the table ecosystem. Types are still compatible
if the physical type is a legacy decimal type (converted from
Types#BIG_DEC) and the logical type is DECIMAL(38, 18). This is to support
legacy TypeInformation for TableSource and TableSink.

看起来像是在兼容旧的 TypeInformation

zhisheng <zh...@gmail.com> 于2020年3月19日周四 上午8:31写道:

> 对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner
>
> Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:
>
>> Hi zhisheng,
>>
>> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>>
>> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com> wrote:
>>
>> > hi, all
>> >
>> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
>> yidun_score
>> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> > caused an error: Field types of query result and registered TableSink
>> > [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> > at
>> >
>> >
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>> > at
>> >
>> >
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
>> > at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
>> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> > at
>> >
>> >
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>> > at
>> >
>> >
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> > Caused by: org.apache.flink.table.api.ValidationException: Field types
>> of
>> > query result and registered TableSink [Result] do not match.
>> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
>> > total_order_count: Integer, loss_total_order_count: Integer,
>> yidun_score:
>> > BigDecimal, is_delete: Boolean]
>> > at
>> >
>> >
>> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
>> > at scala.Option.map(Option.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at
>> >
>> >
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >
>> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
>> > bug?
>> >
>>
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by zhisheng <zh...@gmail.com>.
对的,这是生产的一个 1.9 blink planner 作业,今天我测试一下 1.10 blink planner

Jark Wu <im...@gmail.com> 于2020年3月18日周三 下午11:47写道:

> Hi zhisheng,
>
> 你用的是1.9吗? 试过 1.10.0 blink planner 么?
>
> On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com> wrote:
>
> > hi, all
> >
> > 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL
> yidun_score
> > 字段也是定义的 numeric(5,2) 类型,结果会报异常。
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: Field types of query result and registered TableSink
> > [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> > at
> >
> >
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> > at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> > Caused by: org.apache.flink.table.api.ValidationException: Field types of
> > query result and registered TableSink [Result] do not match.
> > Query result schema: [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> > total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> > BigDecimal, is_delete: Boolean]
> > at
> >
> >
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> > at scala.Option.map(Option.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >
> > 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
> > bug?
> >
>

Re: Field types of query result and registered TableSink [Result] do not match

Posted by Jark Wu <im...@gmail.com>.
Hi zhisheng,

你用的是1.9吗? 试过 1.10.0 blink planner 么?

On Wed, 18 Mar 2020 at 22:21, zhisheng <zh...@gmail.com> wrote:

> hi, all
>
> 我定义的一个表的一个字段(yidun_score)是 numeric(5,2) 类型,写入 PostgreSQL 的 DDL yidun_score
> 字段也是定义的 numeric(5,2) 类型,结果会报异常。
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Field types of query result and registered TableSink
> [Result] do not match.
> Query result schema: [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink [Result] do not match.
> Query result schema: [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> TableSink schema:    [user_new_id: Long, total_credit_score: Integer,
> total_order_count: Integer, loss_total_order_count: Integer, yidun_score:
> BigDecimal, is_delete: Boolean]
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSink(TableSinkUtils.scala:69)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:179)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:178)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:178)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:146)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> 我后面把 numeric(5,2) 类型都改成 double,结果发现就不报异常了,可以正常将数据写入 PG,不知道这个 case 是不是一个
> bug?
>