You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by star <31...@qq.com> on 2020/06/05 02:22:13 UTC

flink1.9 Sql 注册的中间临时表不自动存state的吗?

大家好&nbsp; &nbsp;我使用的flink 1.9的blink planner



先按 月 和城市 去重id&nbsp; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable&nbsp; &nbsp;rowkey是month+city




然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city






运行了近18个小时 中间有过restore和ck失败,然后我统计了&nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因


伪代码如下:




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
val monthCount = tableEnv.sqlQuery(
&nbsp; &nbsp; &nbsp; s"""
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;select month,city,count(distinct id) as cnt from monthtable&nbsp; group by month,city
&nbsp; &nbsp; &nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&gt;{
&nbsp; &nbsp; &nbsp; val row=line._2
&nbsp; &nbsp; &nbsp; val month=row.getField(0).toString
&nbsp; &nbsp; &nbsp; val city=row.getField(1).toString
&nbsp; &nbsp; &nbsp; val cnt=row.getField(2).toString
&nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]()
&nbsp; &nbsp; &nbsp; map.put("cnt",cnt)
&nbsp; &nbsp; &nbsp; (month+city,map)// month+city是rowkey&nbsp; cnt是一个column
&nbsp; &nbsp; }).addSink(new MyHbaseSink("monthHbaseTable")






&nbsp;//将上面的月表注册成新表 monthStat
&nbsp;tableEnv.registerTable("monthStat",monthCount)


&nbsp;//按城市统计id的数量
&nbsp;val totalCount = tableEnv.sqlQuery(
&nbsp; &nbsp; &nbsp; s"""
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;|select city,sum(cnt) as cityCnt from monthStat&nbsp; group by city
&nbsp; &nbsp; &nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为city
&nbsp; &nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&gt;{
&nbsp; &nbsp; &nbsp; val row=line._2
&nbsp; &nbsp; &nbsp; val city=row.getField(0).toString
&nbsp; &nbsp; &nbsp; val totalCnt=row.getField(1).toString
&nbsp; &nbsp; &nbsp; val map=new util.HashMap[String,String]()
&nbsp; &nbsp; &nbsp; map.put("totalCnt",totalCnt)
&nbsp; &nbsp; &nbsp; (city,map)
&nbsp; &nbsp; }).addSink("totalHbaseTable")

回复: flink1.9 Sql 注册的中间临时表不自动存state的吗?

Posted by star <31...@qq.com>.
感谢您的回复,复现应该可以,就是数据集太大了,而且是生产数据,需要脱敏拿下来,操作起来比较难。


另外我看了对不上几条数据,数据的处理的时间和程序restore的时间差很多,好像还不是restore引起了


那个时间点也没有error日志


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<libenchao@gmail.com&gt;;
发送时间:&nbsp;2020年6月5日(星期五) 下午5:53
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flink1.9 Sql 注册的中间临时表不自动存state的吗?



Hi,

看你的问题描述,我们可能遇到过类似的问题。

我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。

但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题,
也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会
比较困难。

如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。

star <3149768603@qq.com&gt; 于2020年6月5日周五 下午4:02写道:

&gt; 各位大佬有遇到过类似问题吗?
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"star"<3149768603@qq.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:40
&gt; 收件人:&amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
&gt;
&gt;
&gt;
&gt; 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀
&gt;
&gt;
&gt;
&gt;
&gt; ------------------ 原始邮件 ------------------
&gt; 发件人:&amp;nbsp;"zhiyezou"<1530130567@qq.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:31
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
&gt;
&gt;
&gt;
&gt; Hi
&gt;
&gt;
&gt; 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; 发件人:&amp;amp;nbsp;"star"<3149768603@qq.com&amp;amp;gt;;
&gt; 发送时间:&amp;amp;nbsp;2020年6月5日(星期五) 上午10:22
&gt; 收件人:&amp;amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;amp;gt;;
&gt;
&gt; 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗?
&gt;
&gt;
&gt;
&gt; 大家好,我使用的flink 1.9的blink planner
&gt;
&gt;
&gt;
&gt; 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city
&gt;
&gt;
&gt;
&gt;
&gt; 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;amp;amp;nbsp;
&gt; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因
&gt;
&gt;
&gt; 伪代码如下:
&gt;
&gt;
&gt;
&gt;
&gt; val env = StreamExecutionEnvironment.getExecutionEnvironment
&gt;
&gt; env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
&gt; val bsSettings =
&gt; EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt; val tableEnv = StreamTableEnvironment.create(env, bsSettings)
&gt;
&gt;
&gt; myDataStream=......
&gt;
&gt;
&gt;
&gt;
&gt; tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)
&gt;
&gt;
&gt;
&gt;
&gt; //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
&gt; val monthCount = tableEnv.sqlQuery(
&gt; &amp;nbsp;s"""
&gt; select month,city,count(distinct id) as cnt from monthtable group by
&gt; month,city
&gt; &amp;nbsp;""".stripMargin)
&gt;
&gt;
&gt; //将月统计结果输出到hbase,rokey为month+city
&gt; monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;amp;gt;{
&gt; val row=line._2
&gt; &amp;nbsp;val month=row.getField(0).toString
&gt; val city=row.getField(1).toString
&gt; &amp;nbsp;val cnt=row.getField(2).toString
&gt; val map=new util.HashMap[String,String]()
&gt; &amp;nbsp;map.put("cnt",cnt)
&gt; &amp;nbsp;(month+city,map)&amp;nbsp; &amp;nbsp; &amp;nbsp;// month+city是rowkey cnt是一个column
&gt; &amp;nbsp;}).addSink(new MyHbaseSink("monthHbaseTable")
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; //将上面的月表注册成新表 monthStat
&gt; tableEnv.registerTable("monthStat",monthCount)
&gt;
&gt;
&gt; //按城市统计id的数量
&gt; val totalCount = tableEnv.sqlQuery(
&gt; s"""
&gt; select city,sum(cnt) as cityCnt from monthStat&amp;amp;amp;nbsp; group by city
&gt; &amp;nbsp;""".stripMargin)
&gt;
&gt;
&gt; //将月统计结果输出到hbase,rokey为city
&gt; &amp;nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;amp;gt;{
&gt; val row=line._2
&gt; val city=row.getField(0).toString
&gt; val totalCnt=row.getField(1).toString
&gt; val map=new util.HashMap[String,String]()
&gt; map.put("totalCnt",totalCnt)
&gt; (city,map)
&gt; &amp;nbsp;}).addSink("totalHbaseTable")



-- 

Best,
Benchao Li

Re: flink1.9 Sql 注册的中间临时表不自动存state的吗?

Posted by Benchao Li <li...@gmail.com>.
Hi,

看你的问题描述,我们可能遇到过类似的问题。

我们的问题是,从cp恢复之后,某些key跟之前的状态里的key对不上了,所以就有点类似于丢失了一部分状态。

但是我们也没有查出来具体的原因,一方面是因为问题比较难以复现,我们用线上数据,也只是有部分数据有问题,
也看不出来这部分有问题的数据有什么规律;另一方面是blink planner底层用的都是binary的数据结构,debug起来也会
比较困难。

如果你能提供一个比较稳定的能复现的数据集和测试方法,我觉得这个问题我们可以再推进解决一下。

star <31...@qq.com> 于2020年6月5日周五 下午4:02写道:

> 各位大佬有遇到过类似问题吗?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"star"<3149768603@qq.com&gt;;
> 发送时间:&nbsp;2020年6月5日(星期五) 上午10:40
> 收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> 没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人:&nbsp;"zhiyezou"<1530130567@qq.com&gt;;
> 发送时间:&nbsp;2020年6月5日(星期五) 上午10:31
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> Hi
>
>
> 如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL
>
>
>
>
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> 发件人:&amp;nbsp;"star"<3149768603@qq.com&amp;gt;;
> 发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:22
> 收件人:&amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;gt;;
>
> 主题:flink1.9 Sql 注册的中间临时表不自动存state的吗?
>
>
>
> 大家好,我使用的flink 1.9的blink planner
>
>
>
> 先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city
>
>
>
>
> 然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city
>
>
>
>
>
>
> 运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;amp;nbsp;
> 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因
>
>
> 伪代码如下:
>
>
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(env, bsSettings)
>
>
> myDataStream=......
>
>
>
>
> tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)
>
>
>
>
> //按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
> val monthCount = tableEnv.sqlQuery(
> &nbsp;s"""
> select month,city,count(distinct id) as cnt from monthtable group by
> month,city
> &nbsp;""".stripMargin)
>
>
> //将月统计结果输出到hbase,rokey为month+city
> monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
> val row=line._2
> &nbsp;val month=row.getField(0).toString
> val city=row.getField(1).toString
> &nbsp;val cnt=row.getField(2).toString
> val map=new util.HashMap[String,String]()
> &nbsp;map.put("cnt",cnt)
> &nbsp;(month+city,map)&nbsp; &nbsp; &nbsp;// month+city是rowkey cnt是一个column
> &nbsp;}).addSink(new MyHbaseSink("monthHbaseTable")
>
>
>
>
>
>
> //将上面的月表注册成新表 monthStat
> tableEnv.registerTable("monthStat",monthCount)
>
>
> //按城市统计id的数量
> val totalCount = tableEnv.sqlQuery(
> s"""
> select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city
> &nbsp;""".stripMargin)
>
>
> //将月统计结果输出到hbase,rokey为city
> &nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
> val row=line._2
> val city=row.getField(0).toString
> val totalCnt=row.getField(1).toString
> val map=new util.HashMap[String,String]()
> map.put("totalCnt",totalCnt)
> (city,map)
> &nbsp;}).addSink("totalHbaseTable")



-- 

Best,
Benchao Li

回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?

Posted by star <31...@qq.com>.
各位大佬有遇到过类似问题吗?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"star"<3149768603@qq.com&gt;;
发送时间:&nbsp;2020年6月5日(星期五) 上午10:40
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?



没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀




------------------ 原始邮件 ------------------
发件人:&nbsp;"zhiyezou"<1530130567@qq.com&gt;;
发送时间:&nbsp;2020年6月5日(星期五) 上午10:31
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?



Hi


如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"star"<3149768603@qq.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:22
收件人:&amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;gt;;

主题:flink1.9 Sql 注册的中间临时表不自动存state的吗?



大家好,我使用的flink 1.9的blink planner



先按 月 和城市 去重id; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable;rowkey是month+city




然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city






运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;amp;nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因


伪代码如下:




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
val monthCount = tableEnv.sqlQuery(
&nbsp;s"""
select month,city,count(distinct id) as cnt from monthtable group by month,city
&nbsp;""".stripMargin)


//将月统计结果输出到hbase,rokey为month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
val row=line._2
&nbsp;val month=row.getField(0).toString
val city=row.getField(1).toString
&nbsp;val cnt=row.getField(2).toString
val map=new util.HashMap[String,String]()
&nbsp;map.put("cnt",cnt)
&nbsp;(month+city,map)&nbsp; &nbsp; &nbsp;// month+city是rowkey cnt是一个column
&nbsp;}).addSink(new MyHbaseSink("monthHbaseTable")






//将上面的月表注册成新表 monthStat
tableEnv.registerTable("monthStat",monthCount)


//按城市统计id的数量
val totalCount = tableEnv.sqlQuery(
s"""
select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city
&nbsp;""".stripMargin)


//将月统计结果输出到hbase,rokey为city
&nbsp;totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
val row=line._2
val city=row.getField(0).toString
val totalCnt=row.getField(1).toString
val map=new util.HashMap[String,String]()
map.put("totalCnt",totalCnt)
(city,map)
&nbsp;}).addSink("totalHbaseTable")

回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?

Posted by star <31...@qq.com>.
没有使用窗口,状态不该被清理,但从结果看貌似被清理过呀




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"zhiyezou"<1530130567@qq.com&gt;;
发送时间:&nbsp;2020年6月5日(星期五) 上午10:31
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?



Hi


如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"star"<3149768603@qq.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月5日(星期五) 上午10:22
收件人:&amp;nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;flink1.9 Sql 注册的中间临时表不自动存state的吗?



大家好&amp;amp;nbsp; &amp;amp;nbsp;我使用的flink 1.9的blink planner



先按 月 和城市 去重id&amp;amp;nbsp; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable&amp;amp;nbsp; &amp;amp;nbsp;rowkey是month+city




然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city






运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;amp;nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因


伪代码如下:




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
val monthCount = tableEnv.sqlQuery(
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; s"""
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;select month,city,count(distinct id) as cnt from monthtable&amp;amp;nbsp; group by month,city
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val row=line._2
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val month=row.getField(0).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val city=row.getField(1).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val cnt=row.getField(2).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val map=new util.HashMap[String,String]()
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; map.put("cnt",cnt)
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; (month+city,map)// month+city是rowkey&amp;amp;nbsp; cnt是一个column
&amp;amp;nbsp; &amp;amp;nbsp; }).addSink(new MyHbaseSink("monthHbaseTable")






&amp;amp;nbsp;//将上面的月表注册成新表 monthStat
&amp;amp;nbsp;tableEnv.registerTable("monthStat",monthCount)


&amp;amp;nbsp;//按城市统计id的数量
&amp;amp;nbsp;val totalCount = tableEnv.sqlQuery(
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; s"""
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;|select city,sum(cnt) as cityCnt from monthStat&amp;amp;nbsp; group by city
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为city
&amp;amp;nbsp; &amp;amp;nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;amp;gt;{
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val row=line._2
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val city=row.getField(0).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val totalCnt=row.getField(1).toString
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; val map=new util.HashMap[String,String]()
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; map.put("totalCnt",totalCnt)
&amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; (city,map)
&amp;amp;nbsp; &amp;amp;nbsp; }).addSink("totalHbaseTable")

回复:flink1.9 Sql 注册的中间临时表不自动存state的吗?

Posted by zhiyezou <15...@qq.com>.
Hi


如果没有使用窗口的话,状态是不会被自动清理掉的,需要自己主动配置状态的TTL




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"star"<3149768603@qq.com&gt;;
发送时间:&nbsp;2020年6月5日(星期五) 上午10:22
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;flink1.9 Sql 注册的中间临时表不自动存state的吗?



大家好&amp;nbsp; &amp;nbsp;我使用的flink 1.9的blink planner



先按 月 和城市 去重id&amp;nbsp; 注册成一张临时表(月表) 将结果实时输出到hbase表:monthtable&amp;nbsp; &amp;nbsp;rowkey是month+city




然后基于上面这张表 再按city汇总 输出到hbase表:totalTable ,rowkey是 city






运行了近18个小时 中间有过restore和ck失败,然后我统计了&amp;nbsp; 两个表到cnt汇总值居然不一样,totalTable比monthtable小,而且还差了不了,请问这会是上面原因


伪代码如下:




val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bsSettings)


myDataStream=......




tableEnv.registerDataStream("monthtable", myDataStream,'month','city ,'id)




//按月份,城市统计去重的id数量(id和city有关,同一个城市的id不会重复)
val monthCount = tableEnv.sqlQuery(
&amp;nbsp; &amp;nbsp; &amp;nbsp; s"""
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;select month,city,count(distinct id) as cnt from monthtable&amp;nbsp; group by month,city
&amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为month+city
monthCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{
&amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2
&amp;nbsp; &amp;nbsp; &amp;nbsp; val month=row.getField(0).toString
&amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(1).toString
&amp;nbsp; &amp;nbsp; &amp;nbsp; val cnt=row.getField(2).toString
&amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]()
&amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("cnt",cnt)
&amp;nbsp; &amp;nbsp; &amp;nbsp; (month+city,map)// month+city是rowkey&amp;nbsp; cnt是一个column
&amp;nbsp; &amp;nbsp; }).addSink(new MyHbaseSink("monthHbaseTable")






&amp;nbsp;//将上面的月表注册成新表 monthStat
&amp;nbsp;tableEnv.registerTable("monthStat",monthCount)


&amp;nbsp;//按城市统计id的数量
&amp;nbsp;val totalCount = tableEnv.sqlQuery(
&amp;nbsp; &amp;nbsp; &amp;nbsp; s"""
&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;|select city,sum(cnt) as cityCnt from monthStat&amp;nbsp; group by city
&amp;nbsp; &amp;nbsp; &amp;nbsp; """.stripMargin)


//将月统计结果输出到hbase,rokey为city
&amp;nbsp; &amp;nbsp; totalCount.toRetractStream[Row].filter(_._1).map(line=&amp;gt;{
&amp;nbsp; &amp;nbsp; &amp;nbsp; val row=line._2
&amp;nbsp; &amp;nbsp; &amp;nbsp; val city=row.getField(0).toString
&amp;nbsp; &amp;nbsp; &amp;nbsp; val totalCnt=row.getField(1).toString
&amp;nbsp; &amp;nbsp; &amp;nbsp; val map=new util.HashMap[String,String]()
&amp;nbsp; &amp;nbsp; &amp;nbsp; map.put("totalCnt",totalCnt)
&amp;nbsp; &amp;nbsp; &amp;nbsp; (city,map)
&amp;nbsp; &amp;nbsp; }).addSink("totalHbaseTable")