You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/07/13 13:09:03 UTC
flink 同时sink hbase和hive,hbase少记录
flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
query:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE hbase_table (
| rowkey VARCHAR,
| cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
| 'connector.type' = 'hbase',
| 'connector.version' = '2.1.0',
| 'connector.table-name' = 'ods:user_hbase6',
| 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.zookeeper.znode.parent' = '/hbase',
| 'connector.write.buffer-flush.max-size' = '1mb',
| 'connector.write.buffer-flush.max-rows' = '1',
| 'connector.write.buffer-flush.interval' = '0s'
|)
|""".stripMargin)
val statementSet = streamTableEnv.createStatementSet()
val insertHbase =
"""
|insert into hbase_table
|SELECT
| CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
| ROW(sex, age, created_time ) as cf
|FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
|
|""".stripMargin
statementSet.addInsertSql(insertHbase)
val insertHive =
"""
|
|INSERT INTO odsCatalog.ods.hive_table
|SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
|FROM kafka_table
|
|""".stripMargin
statementSet.addInsertSql(insertHive)
statementSet.execute()
是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
并且,按照官网文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
设置参数也不识别,报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
看了一下源码,
org.apache.flink.table.descriptors.HBaseValidator
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
参数还是老参数
Re:回复:Re: flink 同时sink hbase和hive,hbase少记录
Posted by Zhou Zach <wa...@163.com>.
Hi,
感谢社区热心答疑!
在 2020-07-14 11:00:18,"夏帅" <jk...@dingtalk.com.INVALID> 写道:
>你好,
>本质还是StreamingFileSink,所以目前只能append
>
>
>------------------------------------------------------------------
>发件人:Zhou Zach <wa...@163.com>
>发送时间:2020年7月14日(星期二) 10:56
>收件人:user-zh <us...@flink.apache.org>
>主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录
>
>
>
>
>Hi Leonard,
>原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>>Hi,
>>
>>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>>>
>>>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>
>>看下这个抽取出来的rowkey是否有重复的呢?
>>
>>祝好,
>>Leonard Xu
回复:Re: flink 同时sink hbase和hive,hbase少记录
Posted by 夏帅 <jk...@dingtalk.com.INVALID>.
你好,
本质还是StreamingFileSink,所以目前只能append
------------------------------------------------------------------
发件人:Zhou Zach <wa...@163.com>
发送时间:2020年7月14日(星期二) 10:56
收件人:user-zh <us...@flink.apache.org>
主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录
Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>>
>>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu
Re:Re: flink 同时sink hbase和hive,hbase少记录
Posted by Zhou Zach <wa...@163.com>.
Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
在 2020-07-14 09:56:00,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>>
>>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu
Re: flink 同时sink hbase和hive,hbase少记录
Posted by Leonard Xu <xb...@gmail.com>.
Hi,
> 在 2020年7月14日,09:52,Zhou Zach <wa...@163.com> 写道:
>
>>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
看下这个抽取出来的rowkey是否有重复的呢?
祝好,
Leonard Xu
Re:Re: flink 同时sink hbase和hive,hbase少记录
Posted by Zhou Zach <wa...@163.com>.
Hi, Leonard
我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序,
再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer,
再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪
在 2020-07-13 21:50:54,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi, Zhou
>
>
>> 'connector.write.buffer-flush.max-size' = '1mb',
>> 'connector.write.buffer-flush.interval' = ‘0s'
>
>(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。
>
>(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]
>
>
>Best,
>Leonard Xu
>[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>
>
>
>> 在 2020年7月13日,21:09,Zhou Zach <wa...@163.com> 写道:
>>
>>
>>
>> flink订阅kafka消息,同时sink到hbase和hive中,
>> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
>>
>>
>> query:
>> streamTableEnv.executeSql(
>> """
>> |
>> |CREATE TABLE hbase_table (
>> | rowkey VARCHAR,
>> | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> |) WITH (
>> | 'connector.type' = 'hbase',
>> | 'connector.version' = '2.1.0',
>> | 'connector.table-name' = 'ods:user_hbase6',
>> | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>> | 'connector.zookeeper.znode.parent' = '/hbase',
>> | 'connector.write.buffer-flush.max-size' = '1mb',
>> | 'connector.write.buffer-flush.max-rows' = '1',
>> | 'connector.write.buffer-flush.interval' = '0s'
>> |)
>> |""".stripMargin)
>>
>> val statementSet = streamTableEnv.createStatementSet()
>> val insertHbase =
>> """
>> |insert into hbase_table
>> |SELECT
>> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>> | ROW(sex, age, created_time ) as cf
>> |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
>> |
>> |""".stripMargin
>>
>> statementSet.addInsertSql(insertHbase)
>>
>> val insertHive =
>> """
>> |
>> |INSERT INTO odsCatalog.ods.hive_table
>> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
>> |FROM kafka_table
>> |
>> |""".stripMargin
>> statementSet.addInsertSql(insertHive)
>>
>>
>> statementSet.execute()
>>
>>
>> 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
>>
>>
>>
>>
>>
>>
>> 并且,按照官网文档
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>>
>>
>> 设置参数也不识别,报错:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>>
>>
>> 看了一下源码,
>> org.apache.flink.table.descriptors.HBaseValidator
>> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>> public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>> public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>> public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
>> public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
>> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
>> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
>> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
>> 参数还是老参数
>
Re: flink 同时sink hbase和hive,hbase少记录
Posted by Leonard Xu <xb...@gmail.com>.
Hi, Zhou
> 'connector.write.buffer-flush.max-size' = '1mb',
> 'connector.write.buffer-flush.interval' = ‘0s'
(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 BufferredMutator 做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval 设置为 0s 时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval 设置成 1s 应该就能看到数据了。
(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]
Best,
Leonard Xu
[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674 <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674>
> 在 2020年7月13日,21:09,Zhou Zach <wa...@163.com> 写道:
>
>
>
> flink订阅kafka消息,同时sink到hbase和hive中,
> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
>
>
> query:
> streamTableEnv.executeSql(
> """
> |
> |CREATE TABLE hbase_table (
> | rowkey VARCHAR,
> | cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> |) WITH (
> | 'connector.type' = 'hbase',
> | 'connector.version' = '2.1.0',
> | 'connector.table-name' = 'ods:user_hbase6',
> | 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
> | 'connector.zookeeper.znode.parent' = '/hbase',
> | 'connector.write.buffer-flush.max-size' = '1mb',
> | 'connector.write.buffer-flush.max-rows' = '1',
> | 'connector.write.buffer-flush.interval' = '0s'
> |)
> |""".stripMargin)
>
> val statementSet = streamTableEnv.createStatementSet()
> val insertHbase =
> """
> |insert into hbase_table
> |SELECT
> | CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
> | ROW(sex, age, created_time ) as cf
> |FROM (select uid,sex,age, cast(created_time as VARCHAR) as created_time from kafka_table)
> |
> |""".stripMargin
>
> statementSet.addInsertSql(insertHbase)
>
> val insertHive =
> """
> |
> |INSERT INTO odsCatalog.ods.hive_table
> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), DATE_FORMAT(created_time, 'HH')
> |FROM kafka_table
> |
> |""".stripMargin
> statementSet.addInsertSql(insertHive)
>
>
> statementSet.execute()
>
>
> 是因为参数'connector.write.buffer-flush.max-size' = '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1kb
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 10b
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in bytes) value but was: 1
>
>
>
>
>
>
> 并且,按照官网文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>
>
> 设置参数也不识别,报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hbase-2.1.0' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>
>
> 看了一下源码,
> org.apache.flink.table.descriptors.HBaseValidator
> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
> public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
> public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
> public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
> public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
> public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
> 参数还是老参数