You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "baiyg25281@hundsun.com" <ba...@hundsun.com> on 2019/03/26 10:02:38 UTC

实现 UpsertStreamTableSink, BatchTableSink 接口代码

大家好!

        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 江湖救急啊!

        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] does not match the number[2] of requested type [Java Tuple2<Boolean, Row(ido: String, nameo: String, moneyo: String, timeo: String)>].

       

        主要盲点:
        1、要怎么匹配上这个类型  Tuple2<Boolean, Row> ?这里面决定 update 或 delete 的 Boolean型值 怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
            @Override
            public void setIsAppendOnly(Boolean isAppendOnly){}



baiyg25281@hundsun.com

回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

Posted by 邓成刚【qq】 <bn...@qq.com>.
这里面决定 update 或 delete 的 Boolean型值 怎么赋?

这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段。。。。

不知道我的理解是否正确,期待大佬解答。。。

邓成刚【qq】
 
发件人: baiyg25281@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!

        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 江湖救急啊!

        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] does not match the number[2] of requested type [Java Tuple2<Boolean, Row(ido: String, nameo: String, moneyo: String, timeo: String)>].

       

        主要盲点:
        1、要怎么匹配上这个类型  Tuple2<Boolean, Row> ?这里面决定 update 或 delete 的 Boolean型值 怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
            @Override
            public void setIsAppendOnly(Boolean isAppendOnly){}

baiyg25281@hundsun.com

Re: Re: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

Posted by 邓成刚【qq】 <bn...@qq.com>.
sql:
select EVENTTIME,ID,EVENT_ID,MSISDN,TS 
from (select a.*,ROW_NUMBER() over(partition by EVENT_ID,MSISDN order by TS desc) AS rw
          from table1 a
) where rw = 1

tableEnv.toRetractStream(结果表, Row.class).print();


输出结果,分析结果发现,第二条的  1553652720961584  比第一条的时间 1553652720927835 更大,同时输出一条 false 的,数据结果与第一条相同,説明第三条是用来作删除操作,删掉第一条数据。。。

(true,2019-03-27 02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XXXXXX,1553652720927835)
(true,2019-03-27 02:12:00.0,1243296274875303910,"1c3.2729.20190327021200",XXXXXX,1553652720961584)
(false,2019-03-27 02:12:00.0,1243296274875303847,"1c3.2729.20190327021200",XXXXXX,1553652720927835)

结论:true 是用来插入数据的,false 是用来删除数据的,出现false时一定会有一条之前插入的数据 。。。

邓成刚【qq】
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:40
收件人: user-zh
主题: Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。
 
 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段。。。。
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25281@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] does not match the number[2] of requested type [Java Tuple2<Boolean, Row(ido: String, nameo: String, moneyo: String, timeo: String)>].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2<Boolean, Row> ?这里面决定 update 或 delete 的 Boolean型值 怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
            @Override
            public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25281@hundsun.com

Re: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码

Posted by 邓成刚【qq】 <bn...@qq.com>.
不好意思,我理解错了,更正一下:
APPEND 流是没有这个字段的,只有更新流才有,true 表示 APPEND ,false 表示 update,这个值应该是流发出数据时自己带的,不是人为赋值的。。。

 
发件人: 邓成刚【qq】
发送时间: 2019-03-26 18:27
收件人: user-zh
主题: 回复: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
这里面决定 update 或 delete 的 Boolean型值 怎么赋?
 
这里的  Boolean 值 是流类型决定,如果流是APPEND,则为true, update,为false,你直接打印流会有这个字段。。。。
 
不知道我的理解是否正确,期待大佬解答。。。
 
邓成刚【qq】
 
发件人: baiyg25281@hundsun.com
发送时间: 2019-03-26 18:02
收件人: user-zh
主题: 实现 UpsertStreamTableSink, BatchTableSink 接口代码
大家好!
 
        伙伴们,附件有实现 blink 中 flink-table 模块  UpsertStreamTableSink, BatchTableSink 接口代码 ,自实现类放在 flink-jdbc 模块  org.apache.flink.api.java.io.jdbc 包下。大神们帮忙看看呗! 江湖救急啊!
 
        目前实现后,在代码中调用,报异常:
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [4] of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@377dfb8d] does not match the number[2] of requested type [Java Tuple2<Boolean, Row(ido: String, nameo: String, moneyo: String, timeo: String)>].
 
       
 
        主要盲点:
        1、要怎么匹配上这个类型  Tuple2<Boolean, Row> ?这里面决定 update 或 delete 的 Boolean型值 怎么赋? Row 映射进去的底层原理?
        2、这两个改怎么重写?另外 keys 和   isAppendOnly 调用时该怎么赋值?
             @Override
             public void setKeyFields(String[] keys) {}
            @Override
            public void setIsAppendOnly(Boolean isAppendOnly){}
 
baiyg25281@hundsun.com