You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 汪赟 <wa...@icloud.com.INVALID> on 2022/11/01 06:20:59 UTC

Re: Flink k8s operator高可用部署Flink Session Cluster,提交job遇到异常。

退订

发自我的 iPhone

> 在 2022年10月28日,11:41,Weihua Hu <hu...@gmail.com> 写道:
> 
> Hi, Young
> 
> 你的分析是正确的。Flink kubernetes operator 是通过 rest service 来跟 Flink cluster
> 通信的,Kubernetes 会随机将发往 service 的请求路由到后端的多个 JM Pod
> 上。任务提交流程分为了:uploadJar,runJob,deleteJar 三个 API,所以会在 opeartor 的日志里看到相关的错误。
> 
> 也许你可以创建一个 jira issue 来跟进这个问题
> 
> Best,
> Weihua
> 
> 
>> On Thu, Oct 27, 2022 at 6:51 PM Young Chen <ni...@outlook.com> wrote:
>> 
>> 【问题描述】
>> 
>> Flink k8s operator(v1.1.0)高可用部署了一个Flink Session Cluster(两个JobManager),
>> 然后用SessionJob 部署一个例子job,job有时可以部署,有时部署不了。
>> 
>> 可以看到容器中如下error日志。
>> 
>> 
>> 
>> 【操作步骤】
>> 
>> 部署Cluster
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkDeployment
>> 
>> metadata:
>> 
>>  name: flink-cluster-1jm-checkpoint
>> 
>> spec:
>> 
>>  image: flink:1.15
>> 
>>  flinkVersion: v1_15
>> 
>>  flinkConfiguration:
>> 
>>    taskmanager.numberOfTaskSlots: "1"
>> 
>>    state.savepoints.dir:
>> file:///flink-data/savepoints<file://flink-data/savepoints>
>> 
>>    state.checkpoints.dir:
>> file:///flink-data/checkpoints<file://flink-data/checkpoints>
>> 
>>    high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> 
>>    high-availability.storageDir:
>> file:///flink-data/ha<file://flink-data/ha>
>> 
>>    state.checkpoints.num-retained: "10"
>> 
>>  serviceAccount: flink
>> 
>>  ingress:
>> 
>>    template: "{{name}}.{{namespace}}.k8s.rf.io"
>> 
>>  jobManager:
>> 
>>    replicas: 2
>> 
>>  podTemplate:
>> 
>>    spec:
>> 
>>      nodeSelector:
>> 
>>        kubernetes.io/hostname: k8s17
>> 
>>      containers:
>> 
>>        - name: flink-main-container
>> 
>>          volumeMounts:
>> 
>>            - mountPath: /flink-data
>> 
>>              name: flink-volume
>> 
>>      volumes:
>> 
>>        - name: flink-volume
>> 
>>          hostPath:
>> 
>>            # directory location on host
>> 
>>            path: /tmp/flink
>> 
>>            # this field is optional
>> 
>>            type: Directory
>> 
>> 
>> 
>> 部署job:
>> 
>> 
>> 
>> apiVersion: flink.apache.org/v1beta1
>> 
>> kind: FlinkSessionJob
>> 
>> metadata:
>> 
>>  name: flink-job-1jm-checkpoint
>> 
>> spec:
>> 
>>  deploymentName: flink-cluster-1jm-checkpoint
>> 
>>  job:
>> 
>>    jarURI:
>> file:///opt/flink/examples/streaming/StateMachineExample.jar<file://opt/flink/examples/streaming/StateMachineExample.jar>
>> # 自己打的operator镜像包含了examples的jar
>> 
>>    entryClass:
>> org.apache.flink.streaming.examples.statemachine.StateMachineExample
>> 
>>    parallelism: 1
>> 
>>    upgradeMode: savepoint
>> 
>> 
>> 
>> 
>> 
>> 【相关日志】
>> 
>>  1.  job部署成功可以运行的一次,operator日志:
>> 
>> 2022-10-27 03:38:07,952 o.a.f.k.o.s.FlinkService
>> [ERROR][flink/flink-job-1jm-checkpoint] Failed to delete the jar:
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar.
>> 
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException:
>> [org.apache.flink.runtime.rest.handler.RestHandlerException: File
>> 06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar does not exist
>> in /tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload.
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler.lambda$handleRequest$0(JarDeleteHandler.java:80)
>> 
>> at
>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>> 
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)
>> 
>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>> 
>> at java.base/java.lang.Thread.run(Unknown Source)
>> 
>> ]
>> 
>> at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown
>> Source
>> 
>> 一个JobManager
>> Pod中没有这个/tmp/flink-web-e78590cd-656d-4f6f-a16a-9e4e994b44a6/flink-web-upload/06c5b90e-865b-4a88-bf30-515ec122e51e_StateMachineExample.jar文件,而在另一个JM的Pod中,但这个JM应该不是Leader,因为看到刷出的checkpoint相关的日志在第一个JM中。
>> 
>> 
>> 
>> 
>> 
>>  1.  job部署失败operator日志:
>> 
>> 2022-10-27 10:12:09,749 i.j.o.p.e.ReconciliationDispatcher
>> [ERROR][flink/flink-job-1jm-checkpoint] Error during event processing
>> ExecutionScope{ resource id: ResourceID{name='flink-job-1jm-checkpoint',
>> namespace='flink'}, version: 120505701} failed.
>> 
>> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>> org.apache.flink.util.FlinkRuntimeException:
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
>> error., <Exception on server side:
>> 
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file
>> /tmp/flink-web-69209c8b-6ed5-45f2-aa99-4bc41efb7983/flink-web-upload/d7df9d81-2cfb-4642-a450-e9080a30db12_StateMachineExample.jar
>> does not exist
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:172)
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)
>> 
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:57)
>> 
>> at
>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
>> 
>> at
>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
>> 
>> at
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>> 
>> at java.base/java.util.Optional.ifPresent(Unknown Source)
>> 
>> Leader JobManager Pod中也看到类似日志。
>> 
>> 
>> 
>> 【分析】
>> 
>> 在HA模式中,JM的replica是否有必要设为过多(配置上replica可配)?
>> 有多个JM时,其中一个是leader,另一个JM怎么会接受到了上传的jar了?
>> 
>> 第1种情况看上去像是
>> operator提交jar到一个JM上,这个JM也把jar部署起来了,然后第二个JM成为了Leader,operator删除时又连接到了第2个JM上,仅导致删除jar失败。
>> 
>> 第2种情况是operator提交jar到一个JM上,但是另一个JM才是leader,导致它部署时找不到jar.
>> 
>> 如果一个JM是leader,那么通过WebUI访问时一定是访问到这个POD,还是也有可能访问到另一个JM的POD?
>> 
>> 通过WebUI上传jar,webUI自动刷新时,一会儿看到这个jar,一会儿看不到。好像两个POD是随机访问的。
>> 
>> 
>> 
>> 
>> 
>>