You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by guanyq <dl...@163.com> on 2023/03/09 14:06:17 UTC

flink on yarn 异常停电问题咨询

前提
1.flink配置了高可用
2.flink配置checkpoint数为10
3.yarn集群配置了任务恢复
疑问
yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动




flink1.17.1版本 flink on yarn 提交无法获取配置文件

Posted by guanyq <dl...@163.com>.
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm 1024m -ytm 1024m ./xx-1.0.jar ./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file /home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_000001/./config.properties does not exist

flink1.10.1版本,读取日志目录问题

Posted by guanyq <dl...@163.com>.
请问下
flink如何监控并实时读取远程服务器的日志目录中所有日志文件内容

日志服务器(可以ssh连接,IP/用户名/密码)

flink1.17.1版本

Posted by guanyq <dl...@163.com>.
请问,flink sql 能否通过sql语句将mysql表加载为flink 内存表
sql语句为多表关联



Re:Re: flink1.16 sql gateway hive2

Posted by guanyq <dl...@163.com>.


感谢两位老师





在 2023-03-27 09:58:24,"Shengkai Fang" <fs...@gmail.com> 写道:
>方勇老师说的没错。我们在文档里面也加了如何配置 hiveserver2 endpoint 的文档[1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/#setting-up
>
>Shammon FY <zj...@gmail.com> 于2023年3月27日周一 08:41写道:
>
>> Hi
>>
>>
>> 如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:
>>
>> > 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
>> > ./bin/sql-gateway.sh start-foreground
>> > -Dsql-gateway.endpoint.type=hiveserver2
>> >
>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
>> >
>> >
>> > 异常信息
>> >
>> > Available factory identifiers are:
>> > rest
>> > at
>> >
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
>> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
>> > at
>> >
>> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
>> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > at
>> >
>> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > Exception in thread "main"
>> > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
>> > start the endpoints.
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
>> > at
>> >
>> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
>> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
>> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> > any factory for identifier 'hiveserver2' that implements
>> > 'SqlGatewayEndpointFactory' in the classpath.
>> > Available factory identifiers are:
>> > rest
>> > at
>> >
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
>> > at
>> >
>> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
>> > ... 2 more
>> >
>> >
>>

Re: flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

Posted by Shammon FY <zj...@gmail.com>.
Hi,

运行的是哪个例子?从错误上看是在从MiniCluster获取结果的时候,MiniCluster被关闭了

Best,
Shammon FY

On Sat, Jul 22, 2023 at 3:25 PM guanyq <dl...@163.com> wrote:

> 本地IDEA运行 MiniCluster is not yet running or has already been shut down.
> 请问是什么原因,如何处理
>
>
>
>
> 15:19:27,511 INFO
> org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
> Stopping resource manager service.
>
> 15:19:27,503 WARN
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] -
> Failed to get job status so we assume that the job has terminated. Some
> data might be lost.
>
> java.lang.IllegalStateException: MiniCluster is not yet running or has
> already been shut down.
>
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> ~[flink-core-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]
>
> at
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
> [flink-table-common-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> [flink-table-api-java-1.17.1.jar:1.17.1]

flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

Posted by guanyq <dl...@163.com>.
本地IDEA运行 MiniCluster is not yet running or has already been shut down. 请问是什么原因,如何处理




15:19:27,511 INFO  org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Stopping resource manager service.

15:19:27,503 WARN  org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - Failed to get job status so we assume that the job has terminated. Some data might be lost.

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.

at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-core-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]

at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120) [flink-table-common-1.17.1.jar:1.17.1]

at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153) [flink-table-api-java-1.17.1.jar:1.17.1]

Re: flink1.16 sql gateway hive2

Posted by Shengkai Fang <fs...@gmail.com>.
方勇老师说的没错。我们在文档里面也加了如何配置 hiveserver2 endpoint 的文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/#setting-up

Shammon FY <zj...@gmail.com> 于2023年3月27日周一 08:41写道:

> Hi
>
>
> 如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录
>
> Best,
> Shammon FY
>
>
> On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:
>
> > 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
> > ./bin/sql-gateway.sh start-foreground
> > -Dsql-gateway.endpoint.type=hiveserver2
> >
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
> >
> >
> > 异常信息
> >
> > Available factory identifiers are:
> > rest
> > at
> >
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> > at
> >
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > at
> >
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > Exception in thread "main"
> > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> > start the endpoints.
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
> > at
> >
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
> > any factory for identifier 'hiveserver2' that implements
> > 'SqlGatewayEndpointFactory' in the classpath.
> > Available factory identifiers are:
> > rest
> > at
> >
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> > at
> >
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> > ... 2 more
> >
> >
>

Re: flink1.16 sql gateway hive2

Posted by Shammon FY <zj...@gmail.com>.
Hi

如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录

Best,
Shammon FY


On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:

> 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
> ./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
>
>
> 异常信息
>
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> Exception in thread "main"
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> start the endpoints.
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'hiveserver2' that implements
> 'SqlGatewayEndpointFactory' in the classpath.
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> ... 2 more
>
>

flink1.16 sql gateway hive2

Posted by guanyq <dl...@163.com>.
本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf


异常信息

Available factory identifiers are:
rest
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72) [flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118) [flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98) [flink-sql-gateway-1.16.0.jar:1.16.0]
Exception in thread "main" org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to start the endpoints.
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
at org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hiveserver2' that implements 'SqlGatewayEndpointFactory' in the classpath.
Available factory identifiers are:
rest
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
at org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
... 2 more


Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
我昨天模拟下断电的情况
10个ha文件的日期是错开的5秒一个
chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的

现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。














在 2023-03-14 10:16:48,"Guojun Li" <gj...@gmail.com> 写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq <dl...@163.com> wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkpoint
>>
>>
>> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>>
>>
>>
>>
>> 会尝试从10个chk恢复,日志有打印
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>>
>>
>>
>> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
>> - Starting the SlotManager.
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Successfully ran initialization on master in 0 ms.
>> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
>> Initialized ZooKeeperCompletedCheckpointStore in
>> '/checkpoints/3844b96b002601d932e66233dd46899c'.
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Using application-defined state backend: File State Backend (checkpoints:
>> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
>> fileStateThreshold: -1)
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Configuring application-defined state backend with job/cluster config
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
>> error constructing remote block reader.
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> The meta file length 0 is less than the expected length 7, for
>> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
>> for file
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> 1301252639_227512278
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>> at
>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
>> at
>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
>> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
>> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
>> at
>> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> at
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
>> at
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
>> at
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
>> at
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
>> >Hi
>> >
>> >一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
>> >
>> >有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
>> >以及最终尝试从哪一次 cp 恢复的。
>> >
>> >也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
>> >
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
>> >
>> >> 没有开启增量chk
>> >> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
>> >> 错误日志为:
>> >>
>> >> java.io.IOException: Got error, status message opReadBlock
>> >> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> >> received exception
>> >> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> >> The meta file length 0 is less than the expected length 7, for
>> >> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
>> >> for file
>> >>
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> >> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> >> 1301252639_227512278
>> >> at
>> >>
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> >> at
>> >>
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> >> at
>> >>
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>> >> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
>> >> chk重启[1]。
>> >> >
>> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >>
>> >>
>> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>> >> >
>> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >> >是观察到checkpoint dir下面没有文件吗?
>> >> >
>> >> >[1]
>> >>
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>> >> >
>> >> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>> >> >>
>> >> >> 目前也想着用savepoint处理异常停电的问题
>> >> >> 但是我这面还有个疑问:
>> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> >> >> 想问下:
>> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >> >> >Hi
>> >> >> >
>> >> >> >我觉得Flink
>> >> >>
>> >>
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >> >> >
>> >> >> >Best,
>> >> >> >Shammon
>> >> >> >
>> >> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >> >> >
>> >> >> >> 前提
>> >> >> >> 1.flink配置了高可用
>> >> >> >> 2.flink配置checkpoint数为10
>> >> >> >> 3.yarn集群配置了任务恢复
>> >> >> >> 疑问
>> >> >> >>
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best,
>> >> >Yanfei
>> >>
>>

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by Guojun Li <gj...@gmail.com>.
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq <dl...@163.com> wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
> at
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
> at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
> at
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
> at
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
> >Hi
> >
> >一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
> >
> >有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
> >以及最终尝试从哪一次 cp 恢复的。
> >
> >也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
> >
> >
> >Best,
> >Weihua
> >
> >
> >On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
> >
> >> 没有开启增量chk
> >> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> >> 错误日志为:
> >>
> >> java.io.IOException: Got error, status message opReadBlock
> >> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> >> received exception
> >> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> >> The meta file length 0 is less than the expected length 7, for
> >> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> >> for file
> >>
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> >> for pool BP-1003103929-192.168.200.11-1668473836936 block
> >> 1301252639_227512278
> >> at
> >>
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> >> at
> >>
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> >> at
> >>
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
> >> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> >> chk重启[1]。
> >> >
> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >>
> >>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >> >
> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >> >是观察到checkpoint dir下面没有文件吗?
> >> >
> >> >[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >> >
> >> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
> >> >>
> >> >> 目前也想着用savepoint处理异常停电的问题
> >> >> 但是我这面还有个疑问:
> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> >> 想问下:
> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >> >> >Hi
> >> >> >
> >> >> >我觉得Flink
> >> >>
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >> >
> >> >> >Best,
> >> >> >Shammon
> >> >> >
> >> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >> >> >
> >> >> >> 前提
> >> >> >> 1.flink配置了高可用
> >> >> >> 2.flink配置checkpoint数为10
> >> >> >> 3.yarn集群配置了任务恢复
> >> >> >> 疑问
> >> >> >>
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >
> >> >
> >> >
> >> >--
> >> >Best,
> >> >Yanfei
> >>
>

Re:Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
flink ha路径为 /tmp/flink/ha/
flink chk路径为 /tmp/flink/checkpoint


我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。




会尝试从10个chk恢复,日志有打印
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7086.


详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting the SlotManager.
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils - Initialized ZooKeeperCompletedCheckpointStore in '/checkpoints/3844b96b002601d932e66233dd46899c'.
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: File State Backend (checkpoints: 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1)
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Configuring application-defined state backend with job/cluster config
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7086.
2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O error constructing remote block reader.
java.io.IOException: Got error, status message opReadBlock BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
for file /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

















在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
>Hi
>
>一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
>
>有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
>以及最终尝试从哪一次 cp 恢复的。
>
>也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
>
>
>Best,
>Weihua
>
>
>On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
>
>> 没有开启增量chk
>> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
>> 错误日志为:
>>
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> The meta file length 0 is less than the expected length 7, for
>> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
>> for file
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> 1301252639_227512278
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
>> chk重启[1]。
>> >
>> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>>
>> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>> >
>> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >是观察到checkpoint dir下面没有文件吗?
>> >
>> >[1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>> >
>> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>> >>
>> >> 目前也想着用savepoint处理异常停电的问题
>> >> 但是我这面还有个疑问:
>> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> >> 想问下:
>> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >> >Hi
>> >> >
>> >> >我觉得Flink
>> >>
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >> >
>> >> >Best,
>> >> >Shammon
>> >> >
>> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >> >
>> >> >> 前提
>> >> >> 1.flink配置了高可用
>> >> >> 2.flink配置checkpoint数为10
>> >> >> 3.yarn集群配置了任务恢复
>> >> >> 疑问
>> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >
>> >
>> >
>> >--
>> >Best,
>> >Yanfei
>>

Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.


理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu" <hu...@gmail.com> 写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq <dl...@163.com> wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>

Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq <dl...@163.com> wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> >> 每个序号不是代表一次尝试么
> >>
>
>

Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> 每个序号不是代表一次尝试么
>>

Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> 每个序号不是代表一次尝试么
>

flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么

Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
Hi

一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)

有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
以及最终尝试从哪一次 cp 恢复的。

也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复


Best,
Weihua


On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:

> 没有开启增量chk
> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> 错误日志为:
>
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> chk重启[1]。
> >
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >是观察到checkpoint dir下面没有文件吗?
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >
> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
> >>
> >> 目前也想着用savepoint处理异常停电的问题
> >> 但是我这面还有个疑问:
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> 想问下:
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >> >Hi
> >> >
> >> >我觉得Flink
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >
> >> >Best,
> >> >Shammon
> >> >
> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >> >
> >> >> 前提
> >> >> 1.flink配置了高可用
> >> >> 2.flink配置checkpoint数为10
> >> >> 3.yarn集群配置了任务恢复
> >> >> 疑问
> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >>
> >> >>
> >> >>
> >> >>
> >
> >
> >
> >--
> >Best,
> >Yanfei
>

Re:Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
没有开启增量chk
文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
错误日志为:

java.io.IOException: Got error, status message opReadBlock BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
for file /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
















在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。
>
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>是观察到checkpoint dir下面没有文件吗?
>
>[1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
>guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>>
>> 目前也想着用savepoint处理异常停电的问题
>> 但是我这面还有个疑问:
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> 想问下:
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >Hi
>> >
>> >我觉得Flink
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >
>> >Best,
>> >Shammon
>> >
>> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >
>> >> 前提
>> >> 1.flink配置了高可用
>> >> 2.flink配置checkpoint数为10
>> >> 3.yarn集群配置了任务恢复
>> >> 疑问
>> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >>
>> >>
>> >>
>> >>
>
>
>
>-- 
>Best,
>Yanfei

Re: Re: flink on yarn 异常停电问题咨询

Posted by Yanfei Lei <fr...@gmail.com>.
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。

> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。

> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
是观察到checkpoint dir下面没有文件吗?

[1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>
> 目前也想着用savepoint处理异常停电的问题
> 但是我这面还有个疑问:
> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> 就很奇怪是不是10个checkpoint都没落盘导致的。
> 想问下:
> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >Hi
> >
> >我觉得Flink
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >
> >Best,
> >Shammon
> >
> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >
> >> 前提
> >> 1.flink配置了高可用
> >> 2.flink配置checkpoint数为10
> >> 3.yarn集群配置了任务恢复
> >> 疑问
> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >>
> >>
> >>
> >>



-- 
Best,
Yanfei

Re:Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
目前也想着用savepoint处理异常停电的问题
但是我这面还有个疑问:
flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
就很奇怪是不是10个checkpoint都没落盘导致的。
想问下:
checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。

















在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>Hi
>
>我觉得Flink
>作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>
>Best,
>Shammon
>
>On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>
>> 前提
>> 1.flink配置了高可用
>> 2.flink配置checkpoint数为10
>> 3.yarn集群配置了任务恢复
>> 疑问
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>>
>>
>>
>>

Re: flink on yarn 异常停电问题咨询

Posted by Shammon FY <zj...@gmail.com>.
Hi

我觉得Flink
作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业

Best,
Shammon

On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:

> 前提
> 1.flink配置了高可用
> 2.flink配置checkpoint数为10
> 3.yarn集群配置了任务恢复
> 疑问
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>
>
>
>