You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈赋赟 <as...@163.com> on 2020/06/08 03:53:20 UTC

关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

原先sql任务是:
CREATE TABLE A_source(...)
CREATE TABLE B_sink (...)
INSERT INTO B_sink
SELECT
     1
FROM 
    A_source
;
我基于这个FlinkSQL任务生成了savepoint后,我重新修改为


CREATE TABLE A_source(...)
CREATE TABLE B_sink (...)
CREATE TABLE C_source(...)
CREATE TABLE D_sink (...)
INSERT INTO B_sink
SELECT
     1
FROM 
    A_source
;


INSERT INTO C_sink
SELECT
     1
FROM 
    D_source
;
并基于Savepoint提交,结果显示

Cannot map checkpoint/savepoint state for operator 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is not available in the new program. 
If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.


想请教一下底层是因为什么原因导致了opertor匹配不上?

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by kcz <57...@qq.com>.
嗯呢tks,收到。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Yichao Yang"<1048262223@qq.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:32
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



Hi


Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。


Best,
Yichao Yang




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"kcz"<573693104@qq.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月10日(星期三) 中午11:27
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
发件人:&amp;amp;nbsp;"方盛凯"<fskmine@gmail.com&amp;amp;gt;;
发送时间:&amp;amp;nbsp;2020年6月10日(星期三) 中午11:00
收件人:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;amp;gt;;

主题:&amp;amp;nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&amp;amp;gt; 于2020年6月9日周二 下午9:26写道:

&amp;amp;gt;
&amp;amp;gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&amp;amp;gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&amp;amp;gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&amp;amp;gt;
&amp;amp;gt; 如有错误,欢迎补充回答。
&amp;amp;gt;
&amp;amp;gt; 陈赋赟 <astion_leo@163.com&amp;amp;gt; 于2020年6月8日周一 上午11:53写道:
&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; 原先sql任务是:
&amp;amp;gt;&amp;amp;gt; CREATE TABLE A_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE B_sink (...)
&amp;amp;gt;&amp;amp;gt; INSERT INTO B_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; A_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; CREATE TABLE A_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE B_sink (...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE C_source(...)
&amp;amp;gt;&amp;amp;gt; CREATE TABLE D_sink (...)
&amp;amp;gt;&amp;amp;gt; INSERT INTO B_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; A_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; INSERT INTO C_sink
&amp;amp;gt;&amp;amp;gt; SELECT
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 1
&amp;amp;gt;&amp;amp;gt; FROM
&amp;amp;gt;&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; D_source
&amp;amp;gt;&amp;amp;gt; ;
&amp;amp;gt;&amp;amp;gt; 并基于Savepoint提交,结果显示
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; Cannot map checkpoint/savepoint state for operator
&amp;amp;gt;&amp;amp;gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&amp;amp;gt;&amp;amp;gt; is not available in the new program.
&amp;amp;gt;&amp;amp;gt; If you want to allow to skip this, you can set the
&amp;amp;gt;&amp;amp;gt; --allowNonRestoredState option on the CLI.
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt;
&amp;amp;gt;&amp;amp;gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&amp;amp;gt;
&amp;amp;gt;

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by Yichao Yang <10...@qq.com>.
Hi


Flink sql 目前不支持给算子自定义uid的。如果这种sql修改比较频繁,建议使用datastream api来支持。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"kcz"<573693104@qq.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:27
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"方盛凯"<fskmine@gmail.com&amp;gt;;
发送时间:&amp;nbsp;2020年6月10日(星期三) 中午11:00
收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;

主题:&amp;nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&amp;gt; 于2020年6月9日周二 下午9:26写道:

&amp;gt;
&amp;gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&amp;gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&amp;gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&amp;gt;
&amp;gt; 如有错误,欢迎补充回答。
&amp;gt;
&amp;gt; 陈赋赟 <astion_leo@163.com&amp;gt; 于2020年6月8日周一 上午11:53写道:
&amp;gt;
&amp;gt;&amp;gt; 原先sql任务是:
&amp;gt;&amp;gt; CREATE TABLE A_source(...)
&amp;gt;&amp;gt; CREATE TABLE B_sink (...)
&amp;gt;&amp;gt; INSERT INTO B_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; CREATE TABLE A_source(...)
&amp;gt;&amp;gt; CREATE TABLE B_sink (...)
&amp;gt;&amp;gt; CREATE TABLE C_source(...)
&amp;gt;&amp;gt; CREATE TABLE D_sink (...)
&amp;gt;&amp;gt; INSERT INTO B_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; A_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; INSERT INTO C_sink
&amp;gt;&amp;gt; SELECT
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 1
&amp;gt;&amp;gt; FROM
&amp;gt;&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; D_source
&amp;gt;&amp;gt; ;
&amp;gt;&amp;gt; 并基于Savepoint提交,结果显示
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; Cannot map checkpoint/savepoint state for operator
&amp;gt;&amp;gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&amp;gt;&amp;gt; is not available in the new program.
&amp;gt;&amp;gt; If you want to allow to skip this, you can set the
&amp;gt;&amp;gt; --allowNonRestoredState option on the CLI.
&amp;gt;&amp;gt;
&amp;gt;&amp;gt;
&amp;gt;&amp;gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&amp;gt;
&amp;gt;

回复: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by kcz <57...@qq.com>.
sql可以指定 operatorID吗?我突然发现我的代码没有指定这个。。。而且我还没找到指定ID的文档。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"方盛凯"<fskmine@gmail.com&gt;;
发送时间:&nbsp;2020年6月10日(星期三) 中午11:00
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题



我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fskmine@gmail.com&gt; 于2020年6月9日周二 下午9:26写道:

&gt;
&gt; 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
&gt; 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
&gt; https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
&gt;
&gt; 如有错误,欢迎补充回答。
&gt;
&gt; 陈赋赟 <astion_leo@163.com&gt; 于2020年6月8日周一 上午11:53写道:
&gt;
&gt;&gt; 原先sql任务是:
&gt;&gt; CREATE TABLE A_source(...)
&gt;&gt; CREATE TABLE B_sink (...)
&gt;&gt; INSERT INTO B_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source
&gt;&gt; ;
&gt;&gt; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
&gt;&gt;
&gt;&gt;
&gt;&gt; CREATE TABLE A_source(...)
&gt;&gt; CREATE TABLE B_sink (...)
&gt;&gt; CREATE TABLE C_source(...)
&gt;&gt; CREATE TABLE D_sink (...)
&gt;&gt; INSERT INTO B_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; A_source
&gt;&gt; ;
&gt;&gt;
&gt;&gt;
&gt;&gt; INSERT INTO C_sink
&gt;&gt; SELECT
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1
&gt;&gt; FROM
&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; D_source
&gt;&gt; ;
&gt;&gt; 并基于Savepoint提交,结果显示
&gt;&gt;
&gt;&gt; Cannot map checkpoint/savepoint state for operator
&gt;&gt; 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
&gt;&gt; is not available in the new program.
&gt;&gt; If you want to allow to skip this, you can set the
&gt;&gt; --allowNonRestoredState option on the CLI.
&gt;&gt;
&gt;&gt;
&gt;&gt; 想请教一下底层是因为什么原因导致了opertor匹配不上?
&gt;
&gt;

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by 方盛凯 <fs...@gmail.com>.
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <fs...@gmail.com> 于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟 <as...@163.com> 于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>      1
>> FROM
>>     A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>      1
>> FROM
>>     A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>      1
>> FROM
>>     D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>

Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

Posted by 方盛凯 <fs...@gmail.com>.
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。

陈赋赟 <as...@163.com> 于2020年6月8日周一 上午11:53写道:

> 原先sql任务是:
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> INSERT INTO B_sink
> SELECT
>      1
> FROM
>     A_source
> ;
> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>
>
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> CREATE TABLE C_source(...)
> CREATE TABLE D_sink (...)
> INSERT INTO B_sink
> SELECT
>      1
> FROM
>     A_source
> ;
>
>
> INSERT INTO C_sink
> SELECT
>      1
> FROM
>     D_source
> ;
> 并基于Savepoint提交,结果显示
>
> Cannot map checkpoint/savepoint state for operator
> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
> is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>
>
> 想请教一下底层是因为什么原因导致了opertor匹配不上?