You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com> on 2020/06/10 03:27:21 UTC

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"方盛凯"<fskmine@gmail.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:00
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&gt; 于2020年6月9日周二 下午9:26写道:

&gt;
&gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&gt;
&gt; 如有错误,欢迎补充回答。
&gt;
&gt; 陈赋赟 <astion_leo@163.com&gt; 于2020年6月8日周一 上午11:53写道:
&gt;
&gt;&gt; 原先sql任务是:
&gt;&gt; CREATE TABLE A_source(...)
&gt;&gt; CREATE TABLE B_sink (...)
&gt;&gt; INSERT INTO B_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source
&gt;&gt; ;
&gt;&gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&gt;&gt;
&gt;&gt;
&gt;&gt; CREATE TABLE A_source(...)
&gt;&gt; CREATE TABLE B_sink (...)
&gt;&gt; CREATE TABLE C_source(...)
&gt;&gt; CREATE TABLE D_sink (...)
&gt;&gt; INSERT INTO B_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source
&gt;&gt; ;
&gt;&gt;
&gt;&gt;
&gt;&gt; INSERT INTO C_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; D_source
&gt;&gt; ;
&gt;&gt; 并基于Savepoint提交,结果显示
&gt;&gt;
&gt;&gt; Cannot map checkpoint/savepoint state for operator
&gt;&gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&gt;&gt; is not available in the new program.
&gt;&gt; If you want to allow to skip this, you can set the
&gt;&gt; --allowNonRestoredState option on the CLI.
&gt;&gt;
&gt;&gt;
&gt;&gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&gt;
&gt;

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by kcz <57...@qq.com>.
嗯呢tks,收到。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Yichao Yang"<1048262223@qq.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:32
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



Hi


Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。


Best,
Yichao Yang




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"kcz"<573693104@qq.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月10日(星期三) 中午11:27
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
发件人:&amp;amp;nbsp;"方盛凯"<fskmine@gmail.com&amp;amp;gt;;
发送时间:&amp;amp;nbsp;2020年6月10日(星期三) 中午11:00
收件人:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;amp;gt;;

主题:&amp;amp;nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&amp;amp;gt; 于2020年6月9日周二 下午9:26写道:

&amp;amp;gt;
&amp;amp;gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&amp;amp;gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&amp;amp;gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&amp;amp;gt;
&amp;amp;gt; 如有错误,欢迎补充回答。
&amp;amp;gt;
&amp;amp;gt; 陈赋赟 <astion_leo@163.com&amp;amp;gt; 于2020年6月8日周一 上午11:53写道:
&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; 原先sql任务是:
&amp;amp;gt;&amp;amp;gt; CREATE TABLE A_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE B_sink (...)
&amp;amp;gt;&amp;amp;gt; INSERT INTO B_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; A_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; CREATE TABLE A_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE B_sink (...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE C_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE D_sink (...)
&amp;amp;gt;&amp;amp;gt; INSERT INTO B_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; A_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; INSERT INTO C_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; D_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt; 并基于Savepoint提交,结果显示
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; Cannot map checkpoint/savepoint state for operator
&amp;amp;gt;&amp;amp;gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&amp;amp;gt;&amp;amp;gt; is not available in the new program.
&amp;amp;gt;&amp;amp;gt; If you want to allow to skip this, you can set the
&amp;amp;gt;&amp;amp;gt; --allowNonRestoredState option on the CLI.
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&amp;amp;gt;
&amp;amp;gt;

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by Yichao Yang <10...@qq.com>.
Hi


Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"kcz"<573693104@qq.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:27
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"方盛凯"<fskmine@gmail.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月10日(星期三) 中午11:00
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&amp;gt; 于2020年6月9日周二 下午9:26写道:

&amp;gt;
&amp;gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&amp;gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&amp;gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&amp;gt;
&amp;gt; 如有错误,欢迎补充回答。
&amp;gt;
&amp;gt; 陈赋赟 <astion_leo@163.com&amp;gt; 于2020年6月8日周一 上午11:53写道:
&amp;gt;
&amp;gt;&amp;gt; 原先sql任务是:
&amp;gt;&amp;gt; CREATE TABLE A_source(...)
&amp;gt;&amp;gt; CREATE TABLE B_sink (...)
&amp;gt;&amp;gt; INSERT INTO B_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; CREATE TABLE A_source(...)
&amp;gt;&amp;gt; CREATE TABLE B_sink (...)
&amp;gt;&amp;gt; CREATE TABLE C_source(...)
&amp;gt;&amp;gt; CREATE TABLE D_sink (...)
&amp;gt;&amp;gt; INSERT INTO B_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; INSERT INTO C_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; D_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt; 并基于Savepoint提交,结果显示
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; Cannot map checkpoint/savepoint state for operator
&amp;gt;&amp;gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&amp;gt;&amp;gt; is not available in the new program.
&amp;gt;&amp;gt; If you want to allow to skip this, you can set the
&amp;gt;&amp;gt; --allowNonRestoredState option on the CLI.
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&amp;gt;
&amp;gt;