You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/07/13 13:09:03 UTC

flink 同时sink hbase和hive,hbase少记录


flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条


query:
streamTableEnv.executeSql(
      """
        |
        |CREATE TABLE hbase_table (
        |    rowkey VARCHAR,
        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
        |) WITH (
        |    'connector.type' = 'hbase',
        |    'connector.version' = '2.1.0',
        |    'connector.table-name' = 'ods:user_hbase6',
        |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
        |    'connector.zookeeper.znode.parent' = '/hbase',
        |    'connector.write.buffer-flush.max-size' = '1mb',
        |    'connector.write.buffer-flush.max-rows' = '1',
        |    'connector.write.buffer-flush.interval' = '0s'
        |)
        |""".stripMargin)

    val statementSet = streamTableEnv.createStatementSet()
    val insertHbase =
      """
        |insert into hbase_table
        |SELECT
        |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
        |   ROW(sex, age, created_time ) as cf
        |FROM  (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
        |
        |""".stripMargin

    statementSet.addInsertSql(insertHbase)

    val insertHive =
      """
        |
        |INSERT INTO odsCatalog.ods.hive_table
        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
        |FROM kafka_table
        |
        |""".stripMargin
    statementSet.addInsertSql(insertHive)


    statementSet.execute()


是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1






并且,按照官网文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


设置参数也不识别,报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.


看了一下源码,
org.apache.flink.table.descriptors.HBaseValidator
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
    public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
    public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
    public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
参数还是老参数

Re:回复:Re: flink 同时sink hbase和hive,hbase少记录

Posted by Zhou Zach <wa...@163.com>.
Hi,
感谢社区热心答疑!

















在 2020-07-14 11:00:18,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
>你好,
>本质还是StreamingFileSink,所以目前只能append
>
>
>------------------------------------------------------------------
>发件人:Zhou Zach <wa...@163.com>
>发送时间:2020年7月14日(星期二) 10:56
>收件人:user-zh <us...@flink.apache.org>
>主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录
>
>
>
>
>Hi Leonard,
>原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>>Hi,
>>
>>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>>> 
>>>>>       |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>
>>看下这个抽取出来的rowkey是否有重复的呢?
>>
>>祝好,
>>Leonard Xu

回复:Re: flink 同时sink hbase和hive,hbase少记录

Posted by 夏帅 <jk...@dingtalk.com.INVALID>.
你好,
本质还是StreamingFileSink,所以目前只能append


------------------------------------------------------------------
发件人:Zhou Zach <wa...@163.com>
发送时间:2020年7月14日(星期二) 10:56
收件人:user-zh <us...@flink.apache.org>
主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录




Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>> 
>>>>       |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu

Re:Re: flink 同时sink hbase和hive,hbase少记录

Posted by Zhou Zach <wa...@163.com>.


Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>> 
>>>>       |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu

Re: flink 同时sink hbase和hive,hbase少记录

Posted by Leonard Xu <xb...@gmail.com>.
Hi,

> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
> 
>>>       |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,

看下这个抽取出来的rowkey是否有重复的呢?

祝好,
Leonard Xu

Re:Re: flink 同时sink hbase和hive,hbase少记录

Posted by Zhou Zach <wa...@163.com>.





Hi, Leonard
我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序,
再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer,
再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪











在 2020-07-13 21:50:54,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi, Zhou
>
>
>>       'connector.write.buffer-flush.max-size' = '1mb',
>>       'connector.write.buffer-flush.interval' = ‘0s'
>
>(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。
>
>(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]
>
>
>Best,
>Leonard Xu
>[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>
>
>
>> 在 2020年7月13日,21:09,Zhou Zach <wa...@163.com> 写道:
>> 
>> 
>> 
>> flink订阅kafka消息,同时sink到hbase和hive中,
>> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
>> 
>> 
>> query:
>> streamTableEnv.executeSql(
>>      """
>>        |
>>        |CREATE TABLE hbase_table (
>>        |    rowkey VARCHAR,
>>        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>>        |) WITH (
>>        |    'connector.type' = 'hbase',
>>        |    'connector.version' = '2.1.0',
>>        |    'connector.table-name' = 'ods:user_hbase6',
>>        |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>        |    'connector.zookeeper.znode.parent' = '/hbase',
>>        |    'connector.write.buffer-flush.max-size' = '1mb',
>>        |    'connector.write.buffer-flush.max-rows' = '1',
>>        |    'connector.write.buffer-flush.interval' = '0s'
>>        |)
>>        |""".stripMargin)
>> 
>>    val statementSet = streamTableEnv.createStatementSet()
>>    val insertHbase =
>>      """
>>        |insert into hbase_table
>>        |SELECT
>>        |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>        |   ROW(sex, age, created_time ) as cf
>>        |FROM  (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
>>        |
>>        |""".stripMargin
>> 
>>    statementSet.addInsertSql(insertHbase)
>> 
>>    val insertHive =
>>      """
>>        |
>>        |INSERT INTO odsCatalog.ods.hive_table
>>        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
>>        |FROM kafka_table
>>        |
>>        |""".stripMargin
>>    statementSet.addInsertSql(insertHive)
>> 
>> 
>>    statementSet.execute()
>> 
>> 
>> 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
>> 
>> 
>> 
>> 
>> 
>> 
>> 并且,按照官网文档
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>> 
>> 
>> 设置参数也不识别,报错:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>> 
>> 
>> 看了一下源码,
>> org.apache.flink.table.descriptors.HBaseValidator
>> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>>    public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>>    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>>    public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
>>    public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
>>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
>>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
>>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
>> 参数还是老参数
>

Re: flink 同时sink hbase和hive,hbase少记录

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Zhou


>       'connector.write.buffer-flush.max-size' = '1mb',
>       'connector.write.buffer-flush.interval' = ‘0s'

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]


Best,
Leonard Xu
[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>


> 在 2020年7月13日,21:09,Zhou Zach <wa...@163.com> 写道:
> 
> 
> 
> flink订阅kafka消息,同时sink到hbase和hive中,
> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
> 
> 
> query:
> streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE hbase_table (
>        |    rowkey VARCHAR,
>        |    cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>        |) WITH (
>        |    'connector.type' = 'hbase',
>        |    'connector.version' = '2.1.0',
>        |    'connector.table-name' = 'ods:user_hbase6',
>        |    'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>        |    'connector.zookeeper.znode.parent' = '/hbase',
>        |    'connector.write.buffer-flush.max-size' = '1mb',
>        |    'connector.write.buffer-flush.max-rows' = '1',
>        |    'connector.write.buffer-flush.interval' = '0s'
>        |)
>        |""".stripMargin)
> 
>    val statementSet = streamTableEnv.createStatementSet()
>    val insertHbase =
>      """
>        |insert into hbase_table
>        |SELECT
>        |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>        |   ROW(sex, age, created_time ) as cf
>        |FROM  (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
>        |
>        |""".stripMargin
> 
>    statementSet.addInsertSql(insertHbase)
> 
>    val insertHive =
>      """
>        |
>        |INSERT INTO odsCatalog.ods.hive_table
>        |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
>        |FROM kafka_table
>        |
>        |""".stripMargin
>    statementSet.addInsertSql(insertHive)
> 
> 
>    statementSet.execute()
> 
> 
> 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
> 
> 
> 
> 
> 
> 
> 并且,按照官网文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
> 
> 
> 设置参数也不识别,报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> 
> 
> 看了一下源码,
> org.apache.flink.table.descriptors.HBaseValidator
> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>    public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>    public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>    public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
>    public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
>    public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
> 参数还是老参数