You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by m18814122325 <m1...@163.com> on 2022/06/12 14:45:47 UTC

Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME                                                                                                                             DATA   AGE

flink-bdra-sql-application-job-00000000000000000000000000000000-config-map      2      13m

flink-bdra-sql-application-job-cluster-config-map                                                            1      13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (00000000000000000000000000000000).

2022-06-10 20:20:55,477 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:55,558 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 1 checkpoints from storage.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to retrieve checkpoint 64.

2022-06-10 20:20:55,760 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:55,760 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.

2022-06-10 20:20:56,254 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 2 ms, total 1 pipelined regions currently.

2022-06-10 20:20:56,266 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd

2022-06-10 20:20:56,267 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54

2022-06-10 20:20:56,267 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend

2022-06-10 20:20:56,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6

2022-06-10 20:20:57,763 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for 00000000000000000000000000000000 located at s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64.

2022-06-10 20:20:57,797 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore

2022-06-10 20:20:57,798 INFO  org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint.

2022-06-10 20:20:57,839 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: user_behavior[1].

2022-06-10 20:20:57,840 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: user_behavior[1] closed.

2022-06-10 20:20:57,847 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: user_behavior[1] from checkpoint.

2022-06-10 20:20:57,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910 for insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:57,989 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}.

2022-06-10 20:20:57,989 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map, watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1

2022-06-10 20:20:57,990 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (00000000000000000000000000000000) under job master id 92d47a56896398911c0078e0a2544608.

2022-06-10 20:20:57,996 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: user_behavior[1].

2022-06-10 20:20:57,998 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]

2022-06-10 20:20:57,999 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000) switched from state CREATED to RUNNING.

2022-06-10 20:20:58,049 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8) (1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED.

2022-06-10 20:20:58,050 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8) (ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED.

2022-06-10 20:20:58,050 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8) (118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED.



Re:Re: Flink k8s HA 手动删除作业deployment导致的异常

Posted by m18814122325 <m1...@163.com>.


感谢两位大大回复!














在 2022-06-13 10:09:39,"Yang Wang" <da...@gmail.com> 写道:
>Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
>之所以这样设计有两点原因:
>(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
>(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露
>
>[1].
>https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
>
>
>Best,
>Yang
>
>Zhanghao Chen <zh...@outlook.com> 于2022年6月13日周一 07:53写道:
>
>> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
>> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>>
>> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
>> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>>
>> 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。
>>
>> 开启 HA 的 Application mode 的 Flink job id
>> 皆为00000000000000000000000000000000,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
>> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
>> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
>> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
>> 相关联,导致作业从全新状态恢复。
>>
>> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>>
>> 可以看下官方的博客文章: https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
>> JIRA 设计文档:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>
>>
>> Best,
>> Zhanghao Chen
>> ________________________________
>> From: m18814122325 <m1...@163.com>
>> Sent: Sunday, June 12, 2022 22:45
>> To: user-zh@flink.apache.org <us...@flink.apache.org>
>> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>>
>> Flink version: 1.15.0
>>
>> deploy mode: Native k8s application
>>
>>
>>
>>
>> 问题现象:
>>
>> 我以Native
>> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
>> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>>
>>
>>
>>
>> kubectl delete deployment flink-bdra-sql-application-job -n
>> bdra-dev-flink-standalone
>>
>>
>>
>>
>> kubectl get configMap -n bdra-dev-flink-standalone
>>
>>
>>
>>
>> NAME
>>                                                        DATA   AGE
>>
>> flink-bdra-sql-application-job-00000000000000000000000000000000-config-map
>>     2      13m
>>
>> flink-bdra-sql-application-job-cluster-config-map
>>                                   1      13m
>>
>>
>>
>>
>>
>>
>>
>> 我有以下疑问:
>>
>> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
>> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>>
>> 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。
>>
>> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>>
>>
>>
>>
>> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>>
>> flink run-application --target kubernetes-application -c CalculateUv
>> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
>> -Dkubernetes.container.image=
>> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
>> -Dkubernetes.namespace=bdra-dev-flink-standalone
>> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
>> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
>> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
>> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
>> -Dstate.backend=filesystem
>> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
>> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
>> -Ds3.access-key=* -Ds3.secret-key=*
>> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
>> -Dmetrics.reporter.influxdb.scheme=http
>> -Dmetrics.reporter.influxdb.host=influxdb
>> -Dmetrics.reporter.influxdb.port=8086
>> -Dmetrics.reporter.influxdb.db=flink_metrics
>> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
>> -Dkubernetes.rest-service.exposed.type=ClusterIP
>> -Dkubernetes.config.file=kube_config
>> -Dkubernetes.pod-template-file=pod-template.yaml
>> local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
>>
>>
>>
>>
>> 重启后自动从ConfigMap中恢复。
>>
>> 2022-06-10 20:20:52,592 INFO
>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
>> [] - Successfully recovered 1 persisted job graphs.
>>
>> 2022-06-10 20:20:52,654 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting
>> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
>> at akka://flink/user/rpc/dispatcher_1 .
>>
>> 2022-06-10 20:20:53,552 INFO
>> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
>> 0 pods from previous attempts, current attempt id is 1.
>>
>> 2022-06-10 20:20:53,552 INFO
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Recovered 0 workers from previous attempt.
>>
>> 2022-06-10 20:20:55,352 INFO
>> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Starting DefaultLeaderElectionService with
>> org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.
>>
>> 2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils
>>                 [] - Starting program (detached: false)
>>
>> 2022-06-10 20:20:55,394 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting
>> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_2 .
>>
>> 2022-06-10 20:20:55,438 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>> Initializing job
>> 'insert-into_default_catalog.default_database.buy_cnt_per_hour'
>> (00000000000000000000000000000000).
>>
>> 2022-06-10 20:20:55,477 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> restart back off time strategy
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000) for
>> insert-into_default_catalog.default_database.buy_cnt_per_hour
>> (00000000000000000000000000000000).
>>
>> 2022-06-10 20:20:55,558 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Recovering checkpoints from
>> KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
>>
>> 2022-06-10 20:20:55,572 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Found 1 checkpoints in
>> KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
>>
>> 2022-06-10 20:20:55,572 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Trying to fetch 1 checkpoints from storage.
>>
>> 2022-06-10 20:20:55,572 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
>> - Trying to retrieve checkpoint 64.
>>
>> 2022-06-10 20:20:55,760 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running
>> initialization on master for job
>> insert-into_default_catalog.default_database.buy_cnt_per_hour
>> (00000000000000000000000000000000).
>>
>> 2022-06-10 20:20:55,760 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
>> Successfully ran initialization on master in 0 ms.
>>
>> 2022-06-10 20:20:56,254 INFO
>> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
>> Built 1 new pipelined regions in 2 ms, total 1 pipelined regions currently.
>>
>> 2022-06-10 20:20:56,266 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> job/cluster config to configure application-defined state backend:
>> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd
>>
>> 2022-06-10 20:20:56,267 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> application-defined state backend:
>> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54
>>
>> 2022-06-10 20:20:56,267 INFO
>> org.apache.flink.runtime.state.StateBackendLoader            [] - State
>> backend loader loads the state backend as HashMapStateBackend
>>
>> 2022-06-10 20:20:56,270 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> job/cluster config to configure application-defined checkpoint storage:
>> org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6
>>
>> 2022-06-10 20:20:57,763 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring
>> job 00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for
>> 00000000000000000000000000000000 located at
>> s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64.
>>
>> 2022-06-10 20:20:57,797 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master
>> state to restore
>>
>> 2022-06-10 20:20:57,798 INFO
>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
>> [] - Resetting coordinator to checkpoint.
>>
>> 2022-06-10 20:20:57,839 INFO
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
>> SourceCoordinator for source Source: user_behavior[1].
>>
>> 2022-06-10 20:20:57,840 INFO
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>> coordinator for source Source: user_behavior[1] closed.
>>
>> 2022-06-10 20:20:57,847 INFO
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
>> Restoring SplitEnumerator of source Source: user_behavior[1] from
>> checkpoint.
>>
>> 2022-06-10 20:20:57,866 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
>> failover strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910
>> for insert-into_default_catalog.default_database.buy_cnt_per_hour
>> (00000000000000000000000000000000).
>>
>> 2022-06-10 20:20:57,989 INFO
>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
>> Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}.
>>
>> 2022-06-10 20:20:57,989 INFO
>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>> [] - Starting to watch for
>> bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map,
>> watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1
>>
>> 2022-06-10 20:20:57,990 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting
>> execution of job
>> 'insert-into_default_catalog.default_database.buy_cnt_per_hour'
>> (00000000000000000000000000000000) under job master id
>> 92d47a56896398911c0078e0a2544608.
>>
>> 2022-06-10 20:20:57,996 INFO
>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting
>> split enumerator for source Source: user_behavior[1].
>>
>> 2022-06-10 20:20:57,998 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting
>> scheduling with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>>
>> 2022-06-10 20:20:57,999 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>> insert-into_default_catalog.default_database.buy_cnt_per_hour
>> (00000000000000000000000000000000) switched from state CREATED to RUNNING.
>>
>> 2022-06-10 20:20:58,049 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8)
>> (1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED.
>>
>> 2022-06-10 20:20:58,050 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8)
>> (ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED.
>>
>> 2022-06-10 20:20:58,050 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8)
>> (118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED.
>>
>>
>>

Re: Flink k8s HA 手动删除作业deployment导致的异常

Posted by Yang Wang <da...@gmail.com>.
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1]
之所以这样设计有两点原因:
(1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复
(2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up


Best,
Yang

Zhanghao Chen <zh...@outlook.com> 于2022年6月13日周一 07:53写道:

> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置
> ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。
>
> 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。
>
> 开启 HA 的 Application mode 的 Flink job id
> 皆为00000000000000000000000000000000,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA
> 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在
> JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的
> job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp
> 相关联,导致作业从全新状态恢复。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
> 可以看下官方的博客文章: https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅
> JIRA 设计文档:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>
>
> Best,
> Zhanghao Chen
> ________________________________
> From: m18814122325 <m1...@163.com>
> Sent: Sunday, June 12, 2022 22:45
> To: user-zh@flink.apache.org <us...@flink.apache.org>
> Subject: Flink k8s HA 手动删除作业deployment导致的异常
>
> Flink version: 1.15.0
>
> deploy mode: Native k8s application
>
>
>
>
> 问题现象:
>
> 我以Native
> k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
> 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
>
>
>
>
> kubectl delete deployment flink-bdra-sql-application-job -n
> bdra-dev-flink-standalone
>
>
>
>
> kubectl get configMap -n bdra-dev-flink-standalone
>
>
>
>
> NAME
>                                                        DATA   AGE
>
> flink-bdra-sql-application-job-00000000000000000000000000000000-config-map
>     2      13m
>
> flink-bdra-sql-application-job-cluster-config-map
>                                   1      13m
>
>
>
>
>
>
>
> 我有以下疑问:
>
> 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
> k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
>
> 2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。
>
> 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
>
>
>
>
> 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
>
> flink run-application --target kubernetes-application -c CalculateUv
> -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
> -Dkubernetes.container.image=
> acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
> -Dkubernetes.namespace=bdra-dev-flink-standalone
> -Dkubernetes.service-account=bdra-dev-flink-standalone-sa
> -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
> -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
> -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
> -Dstate.backend=filesystem
> -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
> -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> -Dhigh-availability.storageDir=file:///opt/flink/log/recovery
> -Ds3.access-key=* -Ds3.secret-key=*
> -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
> -Dmetrics.reporter.influxdb.scheme=http
> -Dmetrics.reporter.influxdb.host=influxdb
> -Dmetrics.reporter.influxdb.port=8086
> -Dmetrics.reporter.influxdb.db=flink_metrics
> -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
> -Dkubernetes.rest-service.exposed.type=ClusterIP
> -Dkubernetes.config.file=kube_config
> -Dkubernetes.pod-template-file=pod-template.yaml
> local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
>
>
>
>
> 重启后自动从ConfigMap中恢复。
>
> 2022-06-10 20:20:52,592 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> [] - Successfully recovered 1 persisted job graphs.
>
> 2022-06-10 20:20:52,654 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/rpc/dispatcher_1 .
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
> 0 pods from previous attempts, current attempt id is 1.
>
> 2022-06-10 20:20:53,552 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
>
> 2022-06-10 20:20:55,352 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.
>
> 2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils
>                 [] - Starting program (detached: false)
>
> 2022-06-10 20:20:55,394 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_2 .
>
> 2022-06-10 20:20:55,438 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> Initializing job
> 'insert-into_default_catalog.default_database.buy_cnt_per_hour'
> (00000000000000000000000000000000).
>
> 2022-06-10 20:20:55,477 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> restart back off time strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000) for
> insert-into_default_catalog.default_database.buy_cnt_per_hour
> (00000000000000000000000000000000).
>
> 2022-06-10 20:20:55,558 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
>
> 2022-06-10 20:20:55,572 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Found 1 checkpoints in
> KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
>
> 2022-06-10 20:20:55,572 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Trying to fetch 1 checkpoints from storage.
>
> 2022-06-10 20:20:55,572 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Trying to retrieve checkpoint 64.
>
> 2022-06-10 20:20:55,760 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running
> initialization on master for job
> insert-into_default_catalog.default_database.buy_cnt_per_hour
> (00000000000000000000000000000000).
>
> 2022-06-10 20:20:55,760 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] -
> Successfully ran initialization on master in 0 ms.
>
> 2022-06-10 20:20:56,254 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 new pipelined regions in 2 ms, total 1 pipelined regions currently.
>
> 2022-06-10 20:20:56,266 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> job/cluster config to configure application-defined state backend:
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd
>
> 2022-06-10 20:20:56,267 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> application-defined state backend:
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54
>
> 2022-06-10 20:20:56,267 INFO
> org.apache.flink.runtime.state.StateBackendLoader            [] - State
> backend loader loads the state backend as HashMapStateBackend
>
> 2022-06-10 20:20:56,270 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> job/cluster config to configure application-defined checkpoint storage:
> org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6
>
> 2022-06-10 20:20:57,763 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring
> job 00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for
> 00000000000000000000000000000000 located at
> s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64.
>
> 2022-06-10 20:20:57,797 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master
> state to restore
>
> 2022-06-10 20:20:57,798 INFO
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
> [] - Resetting coordinator to checkpoint.
>
> 2022-06-10 20:20:57,839 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
> SourceCoordinator for source Source: user_behavior[1].
>
> 2022-06-10 20:20:57,840 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
> coordinator for source Source: user_behavior[1] closed.
>
> 2022-06-10 20:20:57,847 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
> Restoring SplitEnumerator of source Source: user_behavior[1] from
> checkpoint.
>
> 2022-06-10 20:20:57,866 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using
> failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910
> for insert-into_default_catalog.default_database.buy_cnt_per_hour
> (00000000000000000000000000000000).
>
> 2022-06-10 20:20:57,989 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}.
>
> 2022-06-10 20:20:57,989 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
> [] - Starting to watch for
> bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map,
> watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1
>
> 2022-06-10 20:20:57,990 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting
> execution of job
> 'insert-into_default_catalog.default_database.buy_cnt_per_hour'
> (00000000000000000000000000000000) under job master id
> 92d47a56896398911c0078e0a2544608.
>
> 2022-06-10 20:20:57,996 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting
> split enumerator for source Source: user_behavior[1].
>
> 2022-06-10 20:20:57,998 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting
> scheduling with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>
> 2022-06-10 20:20:57,999 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
> insert-into_default_catalog.default_database.buy_cnt_per_hour
> (00000000000000000000000000000000) switched from state CREATED to RUNNING.
>
> 2022-06-10 20:20:58,049 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8)
> (1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED.
>
> 2022-06-10 20:20:58,050 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8)
> (ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED.
>
> 2022-06-10 20:20:58,050 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8)
> (118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED.
>
>
>

Re: Flink k8s HA 手动删除作业deployment导致的异常

Posted by Zhanghao Chen <zh...@outlook.com>.
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。

2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。

开启 HA 的 Application mode 的 Flink job id 皆为00000000000000000000000000000000,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 相关联,导致作业从全新状态恢复。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。

可以看下官方的博客文章: https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink


Best,
Zhanghao Chen
________________________________
From: m18814122325 <m1...@163.com>
Sent: Sunday, June 12, 2022 22:45
To: user-zh@flink.apache.org <us...@flink.apache.org>
Subject: Flink k8s HA 手动删除作业deployment导致的异常

Flink version: 1.15.0

deploy mode: Native k8s application




问题现象:

我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。




kubectl delete deployment flink-bdra-sql-application-job -n  bdra-dev-flink-standalone




kubectl get configMap -n bdra-dev-flink-standalone




NAME                                                                                                                             DATA   AGE

flink-bdra-sql-application-job-00000000000000000000000000000000-config-map      2      13m

flink-bdra-sql-application-job-cluster-config-map                                                            1      13m







我有以下疑问:

1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。

2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。

3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。




重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)

flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar




重启后自动从ConfigMap中恢复。

2022-06-10 20:20:52,592 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs.

2022-06-10 20:20:52,654 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .

2022-06-10 20:20:53,552 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.

2022-06-10 20:20:53,552 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.

2022-06-10 20:20:55,352 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.

2022-06-10 20:20:55,370 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)

2022-06-10 20:20:55,394 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .

2022-06-10 20:20:55,438 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (00000000000000000000000000000000).

2022-06-10 20:20:55,477 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:55,558 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 1 checkpoints from storage.

2022-06-10 20:20:55,572 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to retrieve checkpoint 64.

2022-06-10 20:20:55,760 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:55,760 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.

2022-06-10 20:20:56,254 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 2 ms, total 1 pipelined regions currently.

2022-06-10 20:20:56,266 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd

2022-06-10 20:20:56,267 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54

2022-06-10 20:20:56,267 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend

2022-06-10 20:20:56,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6

2022-06-10 20:20:57,763 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for 00000000000000000000000000000000 located at s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64.

2022-06-10 20:20:57,797 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master state to restore

2022-06-10 20:20:57,798 INFO  org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint.

2022-06-10 20:20:57,839 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: user_behavior[1].

2022-06-10 20:20:57,840 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: user_behavior[1] closed.

2022-06-10 20:20:57,847 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: user_behavior[1] from checkpoint.

2022-06-10 20:20:57,866 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910 for insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000).

2022-06-10 20:20:57,989 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}.

2022-06-10 20:20:57,989 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map, watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1

2022-06-10 20:20:57,990 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (00000000000000000000000000000000) under job master id 92d47a56896398911c0078e0a2544608.

2022-06-10 20:20:57,996 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: user_behavior[1].

2022-06-10 20:20:57,998 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]

2022-06-10 20:20:57,999 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_default_catalog.default_database.buy_cnt_per_hour (00000000000000000000000000000000) switched from state CREATED to RUNNING.

2022-06-10 20:20:58,049 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8) (1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED.

2022-06-10 20:20:58,050 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8) (ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED.

2022-06-10 20:20:58,050 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8) (118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED.