You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 徐州州 <25...@qq.com> on 2021/01/04 00:44:36 UTC

回复: flink 1.12 Cancel Job内存未释放(问)

即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn application -kill application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?



------------------&nbsp;原始邮件&nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&gt;; 
发送时间: 2020年12月29日(星期二) 晚上9:35
收件人: "user-zh"<user-zh@flink.apache.org&gt;; 
主题: Re: flink 1.12 Cancel Job内存未释放(问)



不可以吧。任务是任务。taskManager是taskManager。&nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
或者考虑yarn方式,per-job模式啥的。

徐州州 <25977210@qq.com&gt; 于2020年12月29日周二 上午9:00写道:

&gt; 请教一下,我flink
&gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

回复: flink 1.12 Cancel Job内存未释放(问)

Posted by 徐州州 <25...@qq.com>.
我使用的是flink-on-yarn-cluster模式




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "徐州州"                                                                                    <25977210@qq.com&gt;;
发送时间:&nbsp;2021年1月5日(星期二) 上午9:04
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: flink 1.12 Cancel Job内存未释放(问)



 MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|     do.GrouporgName,
|     du.Name,
|     COUNT(DISTINCT dooi.Code) AS TotalSingular,
|    md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|    current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&gt; 60 AND dooi.Status <&gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------ 原始邮件 ------------------
发件人: "赵一旦"<hinobleyd@gmail.com&gt;; 
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&gt;; 
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&gt; 于2021年1月4日周一 上午8:45写道:

&gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&gt; application -kill
&gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
&gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&gt;
&gt;
&gt;
&gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&gt; 或者考虑yarn方式,per-job模式啥的。
&gt;
&gt; 徐州州 <25977210@qq.com&amp;gt; 于2020年12月29日周二 上午9:00写道:
&gt;
&gt; &amp;gt; 请教一下,我flink
&gt; &amp;gt;
&gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

Re: 回复: flink 1.12 Cancel Job内存未释放(问)

Posted by Yun Tang <my...@live.com>.
Hi 徐州州

请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job manager 部分日志,看是否从checkpoint resume。
如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。

祝好
唐云
________________________________
From: 徐州州 <25...@qq.com>
Sent: Tuesday, January 5, 2021 10:34
To: user-zh@flink.apache.org <us...@flink.apache.org>
Subject: 回复: flink 1.12 Cancel Job内存未释放(问)

这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn application -kill  jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。

------------------ 原始邮件 ------------------
发件人: "user-zh" <li...@163.com>;
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh@flink.apache.org"<us...@flink.apache.org>;
主题: 回复: flink 1.12 Cancel Job内存未释放(问)

这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|     do.GrouporgName,
|     du.Name,
|     COUNT(DISTINCT dooi.Code) AS TotalSingular,
|    md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|    current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&gt; 60 AND dooi.Status <&gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&gt;;
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&gt;;
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&gt; 于2021年1月4日周一 上午8:45写道:

&gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&gt; application -kill
&gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
&gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&gt;
&gt;
&gt;
&gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&gt; 或者考虑yarn方式,per-job模式啥的。
&gt;
&gt; 徐州州 <25977210@qq.com&amp;gt; 于2020年12月29日周二 上午9:00写道:
&gt;
&gt; &amp;gt; 请教一下,我flink
&gt; &amp;gt;
&gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

回复: flink 1.12 Cancel Job内存未释放(问)

Posted by 徐州州 <25...@qq.com>.
这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn application -kill&nbsp; jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <liuhai35@163.com&gt;;
发送时间:&nbsp;2021年1月5日(星期二) 上午10:03
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: flink 1.12 Cancel Job内存未释放(问)



这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977210@qq.com&gt; 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|&nbsp;&nbsp;&nbsp;&nbsp; do.GrouporgName,
|&nbsp;&nbsp;&nbsp;&nbsp; du.Name,
|&nbsp;&nbsp;&nbsp;&nbsp; COUNT(DISTINCT dooi.Code) AS TotalSingular,
|&nbsp;&nbsp;&nbsp; md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|&nbsp;&nbsp;&nbsp; current_date as As_Of_Date
|FROM dw_od_order_info dooi
|&nbsp; INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|&nbsp; INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|&nbsp; INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|&nbsp; INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&amp;gt; 60 AND dooi.Status <&amp;gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&amp;gt; 于2021年1月4日周一 上午8:45写道:

&amp;gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&amp;gt; application -kill
&amp;gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&amp;gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;amp;gt;;
&amp;gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&amp;gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;amp;gt;;
&amp;gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&amp;gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&amp;gt; 或者考虑yarn方式,per-job模式啥的。
&amp;gt;
&amp;gt; 徐州州 <25977210@qq.com&amp;amp;gt; 于2020年12月29日周二 上午9:00写道:
&amp;gt;
&amp;gt; &amp;amp;gt; 请教一下,我flink
&amp;gt; &amp;amp;gt;
&amp;gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

回复: flink 1.12 Cancel Job内存未释放(问)

Posted by 徐州州 <25...@qq.com>.
我写的是flink-sql,with-upsert卡夫卡数据源,我状态后端使用的是MemoryStateBackend,其中设置了env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION),并没有开启检查点。我想实现每天的计算结果互不影响有什么办法吗大佬?试了很多方法都是在昨天基础上累计的,我的计算逻辑比较复杂没有进行开窗。



------------------&nbsp;原始邮件&nbsp;------------------
发件人: "刘海"<liuhai35@163.com&gt;; 
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh"<user-zh@flink.apache.org&gt;; 
主题: 回复: flink 1.12 Cancel Job内存未释放(问)



这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977210@qq.com&gt; 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|&nbsp;&nbsp;&nbsp;&nbsp; do.GrouporgName,
|&nbsp;&nbsp;&nbsp;&nbsp; du.Name,
|&nbsp;&nbsp;&nbsp;&nbsp; COUNT(DISTINCT dooi.Code) AS TotalSingular,
|&nbsp;&nbsp;&nbsp; md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|&nbsp;&nbsp;&nbsp; current_date as As_Of_Date
|FROM dw_od_order_info dooi
|&nbsp; INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|&nbsp; INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|&nbsp; INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|&nbsp; INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&amp;gt; 60 AND dooi.Status <&amp;gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&amp;gt; 于2021年1月4日周一 上午8:45写道:

&amp;gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&amp;gt; application -kill
&amp;gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&amp;gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;amp;gt;;
&amp;gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&amp;gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;amp;gt;;
&amp;gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&amp;gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&amp;gt; 或者考虑yarn方式,per-job模式啥的。
&amp;gt;
&amp;gt; 徐州州 <25977210@qq.com&amp;amp;gt; 于2020年12月29日周二 上午9:00写道:
&amp;gt;
&amp;gt; &amp;amp;gt; 请教一下,我flink
&amp;gt; &amp;amp;gt;
&amp;gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

回复: flink 1.12 Cancel Job内存未释放(问)

Posted by 刘海 <li...@163.com>.
这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|     do.GrouporgName,
|     du.Name,
|     COUNT(DISTINCT dooi.Code) AS TotalSingular,
|    md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|    current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&gt; 60 AND dooi.Status <&gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&gt;;
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&gt;;
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&gt; 于2021年1月4日周一 上午8:45写道:

&gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&gt; application -kill
&gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
&gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&gt;
&gt;
&gt;
&gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&gt; 或者考虑yarn方式,per-job模式啥的。
&gt;
&gt; 徐州州 <25977210@qq.com&amp;gt; 于2020年12月29日周二 上午9:00写道:
&gt;
&gt; &amp;gt; 请教一下,我flink
&gt; &amp;gt;
&gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

回复: flink 1.12 Cancel Job内存未释放(问)

Posted by 徐州州 <25...@qq.com>.
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert into app_bs_drainage_place
|SELECT
|     do.GrouporgName,
|     du.Name,
|     COUNT(DISTINCT dooi.Code) AS TotalSingular,
|    md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|    current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status <&gt; 60 AND dooi.Status <&gt; 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




------------------&nbsp;原始邮件&nbsp;------------------
发件人: "赵一旦"<hinobleyd@gmail.com&gt;; 
发送时间: 2021年1月4日(星期一) 晚上10:06
收件人: "user-zh"<user-zh@flink.apache.org&gt;; 
主题: Re: flink 1.12 Cancel Job内存未释放(问)



具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977210@qq.com&gt; 于2021年1月4日周一 上午8:45写道:

&gt; 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
&gt; application -kill
&gt; application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人: "赵一旦"<hinobleyd@gmail.com&amp;gt;;
&gt; 发送时间: 2020年12月29日(星期二) 晚上9:35
&gt; 收件人: "user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt; 主题: Re: flink 1.12 Cancel Job内存未释放(问)
&gt;
&gt;
&gt;
&gt; 不可以吧。任务是任务。taskManager是taskManager。&amp;nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
&gt; taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
&gt; 或者考虑yarn方式,per-job模式啥的。
&gt;
&gt; 徐州州 <25977210@qq.com&amp;gt; 于2020年12月29日周二 上午9:00写道:
&gt;
&gt; &amp;gt; 请教一下,我flink
&gt; &amp;gt;
&gt; sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?

Re: flink 1.12 Cancel Job内存未释放(问)

Posted by 赵一旦 <hi...@gmail.com>.
具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25...@qq.com> 于2021年1月4日周一 上午8:45写道:

> 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
> application -kill
> application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人: "赵一旦"<hinobleyd@gmail.com&gt;;
> 发送时间: 2020年12月29日(星期二) 晚上9:35
> 收件人: "user-zh"<user-zh@flink.apache.org&gt;;
> 主题: Re: flink 1.12 Cancel Job内存未释放(问)
>
>
>
> 不可以吧。任务是任务。taskManager是taskManager。&nbsp; taskManager是提前启动好的一个进程,任务提交的时候会由
> taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
> 或者考虑yarn方式,per-job模式啥的。
>
> 徐州州 <25977210@qq.com&gt; 于2020年12月29日周二 上午9:00写道:
>
> &gt; 请教一下,我flink
> &gt;
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?